diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 47776076a44..73197b9eeb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -20,12 +20,16 @@ package org.apache.hadoop.hbase.io.hfile.bucket; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.MinMaxPriorityQueue; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,7 +48,7 @@ import com.google.common.primitives.Ints; * when evicting. It manages an array of buckets, each bucket is associated with * a size and caches elements up to this size. For a completely empty bucket, this * size could be re-specified dynamically. - * + * * This class is not thread safe. */ @InterfaceAudience.Private @@ -581,4 +585,45 @@ public final class BucketAllocator { return sz; } + public int getBucketIndex(long offset) { + return (int) (offset / bucketCapacity); + } + + /** + * Returns a set of indices of the buckets that are least filled + * excluding the offsets, we also the fully free buckets for the + * BucketSizes where everything is empty and they only have one + * completely free bucket as a reserved + * + * @param excludedBuckets the buckets that need to be excluded due to + * currently being in used + * @param bucketCount max Number of buckets to return + * @return set of bucket indices which could be used for eviction + */ + public Set getLeastFilledBuckets(Set excludedBuckets, + int bucketCount) { + Queue queue = MinMaxPriorityQueue.orderedBy( + new Comparator() { + @Override + public int compare(Integer left, Integer right) { + // We will always get instantiated buckets + return Float.compare( + ((float) buckets[left].usedCount) / buckets[left].itemCount, + ((float) buckets[right].usedCount) / buckets[right].itemCount); + } + }).maximumSize(bucketCount).create(); + + for (int i = 0; i < buckets.length; i ++ ) { + if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() && + // Avoid the buckets that are the only buckets for a sizeIndex + bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) { + queue.add(i); + } + } + + Set result = new HashSet<>(bucketCount); + result.addAll(queue); + + return result; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5e6f8dda0f5..0380f18236f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -31,6 +31,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -104,6 +105,9 @@ public class BucketCache implements BlockCache, HeapSize { private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; private static final float DEFAULT_MIN_FACTOR = 0.85f; + // Number of blocks to clear for each of the bucket size that is full + private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; + /** Statistics thread */ private static final int statThreadPeriod = 5 * 60; @@ -566,6 +570,53 @@ public class BucketCache implements BlockCache, HeapSize { * DEFAULT_MIN_FACTOR); } + /** + * Return the count of bucketSizeinfos still needf ree space + */ + private int bucketSizesAboveThresholdCount(float minFactor) { + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); + int fullCount = 0; + for (int i = 0; i < stats.length; i++) { + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); + freeGoal = Math.max(freeGoal, 1); + if (stats[i].freeCount() < freeGoal) { + fullCount++; + } + } + return fullCount; + } + + /** + * This method will find the buckets that are minimally occupied + * and are not reference counted and will free them completely + * without any constraint on the access times of the elements, + * and as a process will completely free at most the number of buckets + * passed, sometimes it might not due to changing refCounts + * + * @param completelyFreeBucketsNeeded number of buckets to free + **/ + private void freeEntireBuckets(int completelyFreeBucketsNeeded) { + if (completelyFreeBucketsNeeded != 0) { + // First we will build a set where the offsets are reference counted, usually + // this set is small around O(Handler Count) unless something else is wrong + Set inUseBuckets = new HashSet(); + for (BucketEntry entry : backingMap.values()) { + if (entry.refCount.get() != 0) { + inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); + } + } + + Set candidateBuckets = bucketAllocator.getLeastFilledBuckets( + inUseBuckets, completelyFreeBucketsNeeded); + for (Map.Entry entry : backingMap.entrySet()) { + if (candidateBuckets.contains(bucketAllocator + .getBucketIndex(entry.getValue().offset()))) { + evictBlock(entry.getKey(), false); + } + } + } + } + /** * Free the space if the used size reaches acceptableSize() or one size block * couldn't be allocated. When freeing the space, we use the LRU algorithm and @@ -662,27 +713,14 @@ public class BucketCache implements BlockCache, HeapSize { remainingBuckets--; } - /** - * Check whether need extra free because some bucketSizeinfo still needs - * free space - */ - stats = bucketAllocator.getIndexStatistics(); - boolean needFreeForExtra = false; - for (int i = 0; i < stats.length; i++) { - long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); - freeGoal = Math.max(freeGoal, 1); - if (stats[i].freeCount() < freeGoal) { - needFreeForExtra = true; - break; - } - } - - if (needFreeForExtra) { + // Check and free if there are buckets that still need freeing of space + if (bucketSizesAboveThresholdCount(DEFAULT_MIN_FACTOR) > 0) { bucketQueue.clear(); - remainingBuckets = 2; + remainingBuckets = 3; bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); while ((bucketGroup = bucketQueue.poll()) != null) { long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; @@ -691,6 +729,14 @@ public class BucketCache implements BlockCache, HeapSize { } } + // Even after the above free we might still need freeing because of the + // De-fragmentation of the buckets (also called Slab Calcification problem), i.e + // there might be some buckets where the occupancy is very sparse and thus are not + // yielding the free for the other bucket sizes, the fix for this to evict some + // of the buckets, we do this by evicting the buckets that are least fulled + freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * + bucketSizesAboveThresholdCount(1.0f)); + if (LOG.isDebugEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize();