HBASE-16630 Handle Fragmentation in bucket cache

Currently whenever a compaction/bulkload happen and the
blocks are evicted from theirs buckets the buckets become
fragmented and are not available to be used by other
BucketSizes

Bug Fix : Added Memory block type also to the list of
evictions that need to happen when there is a needForExtra

Improvement : Inorder to fix the non availabilty of Buckets and force
the movement of buckets to transformed sizes, whenever we encounter a
situation where an allocation cant be made for a BucketSize, we will
forcefully free the entire buckets that have least occupancy ratio. This
is the same strategy used by MemCached when they encounter a similar
issue going by the name 'Slab Calcification'. Only improvement is that
we use a heuristic to evict from the buckets that are least occupied
and also avoid the BucketSizes where there is a single Bucket

Change-Id: I9e3b4deb8d893953003ddf5f1e66312ed97ea9cb

Signed-off-by: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
This commit is contained in:
dvdreddy 2016-09-20 17:16:06 -07:00 committed by Ramkrishna
parent 62de29e6f2
commit a8fd1119ce
2 changed files with 110 additions and 20 deletions

View File

@ -20,13 +20,16 @@
package org.apache.hadoop.hbase.io.hfile.bucket; package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.MinMaxPriorityQueue;
import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -583,4 +586,45 @@ public final class BucketAllocator {
return sz; return sz;
} }
public int getBucketIndex(long offset) {
return (int) (offset / bucketCapacity);
}
/**
* Returns a set of indices of the buckets that are least filled
* excluding the offsets, we also the fully free buckets for the
* BucketSizes where everything is empty and they only have one
* completely free bucket as a reserved
*
* @param excludedBuckets the buckets that need to be excluded due to
* currently being in used
* @param bucketCount max Number of buckets to return
* @return set of bucket indices which could be used for eviction
*/
public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
int bucketCount) {
Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
new Comparator<Integer>() {
@Override
public int compare(Integer left, Integer right) {
// We will always get instantiated buckets
return Float.compare(
((float) buckets[left].usedCount) / buckets[left].itemCount,
((float) buckets[right].usedCount) / buckets[right].itemCount);
}
}).maximumSize(bucketCount).create();
for (int i = 0; i < buckets.length; i ++ ) {
if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
// Avoid the buckets that are the only buckets for a sizeIndex
bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
queue.add(i);
}
}
Set<Integer> result = new HashSet<>(bucketCount);
result.addAll(queue);
return result;
}
} }

View File

@ -31,6 +31,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -107,6 +108,9 @@ public class BucketCache implements BlockCache, HeapSize {
private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
private static final float DEFAULT_MIN_FACTOR = 0.85f; private static final float DEFAULT_MIN_FACTOR = 0.85f;
// Number of blocks to clear for each of the bucket size that is full
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
/** Statistics thread */ /** Statistics thread */
private static final int statThreadPeriod = 5 * 60; private static final int statThreadPeriod = 5 * 60;
@ -629,6 +633,53 @@ public class BucketCache implements BlockCache, HeapSize {
* DEFAULT_MIN_FACTOR); * DEFAULT_MIN_FACTOR);
} }
/**
* Return the count of bucketSizeinfos still needf ree space
*/
private int bucketSizesAboveThresholdCount(float minFactor) {
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
int fullCount = 0;
for (int i = 0; i < stats.length; i++) {
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
fullCount++;
}
}
return fullCount;
}
/**
* This method will find the buckets that are minimally occupied
* and are not reference counted and will free them completely
* without any constraint on the access times of the elements,
* and as a process will completely free at most the number of buckets
* passed, sometimes it might not due to changing refCounts
*
* @param completelyFreeBucketsNeeded number of buckets to free
**/
private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
if (completelyFreeBucketsNeeded != 0) {
// First we will build a set where the offsets are reference counted, usually
// this set is small around O(Handler Count) unless something else is wrong
Set<Integer> inUseBuckets = new HashSet<Integer>();
for (BucketEntry entry : backingMap.values()) {
if (entry.refCount.get() != 0) {
inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
}
}
Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
inUseBuckets, completelyFreeBucketsNeeded);
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
if (candidateBuckets.contains(bucketAllocator
.getBucketIndex(entry.getValue().offset()))) {
evictBlock(entry.getKey(), false);
}
}
}
}
/** /**
* Free the space if the used size reaches acceptableSize() or one size block * Free the space if the used size reaches acceptableSize() or one size block
* couldn't be allocated. When freeing the space, we use the LRU algorithm and * couldn't be allocated. When freeing the space, we use the LRU algorithm and
@ -725,27 +776,14 @@ public class BucketCache implements BlockCache, HeapSize {
remainingBuckets--; remainingBuckets--;
} }
/** // Check and free if there are buckets that still need freeing of space
* Check whether need extra free because some bucketSizeinfo still needs if (bucketSizesAboveThresholdCount(DEFAULT_MIN_FACTOR) > 0) {
* free space
*/
stats = bucketAllocator.getIndexStatistics();
boolean needFreeForExtra = false;
for (int i = 0; i < stats.length; i++) {
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
freeGoal = Math.max(freeGoal, 1);
if (stats[i].freeCount() < freeGoal) {
needFreeForExtra = true;
break;
}
}
if (needFreeForExtra) {
bucketQueue.clear(); bucketQueue.clear();
remainingBuckets = 2; remainingBuckets = 3;
bucketQueue.add(bucketSingle); bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti); bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
while ((bucketGroup = bucketQueue.poll()) != null) { while ((bucketGroup = bucketQueue.poll()) != null) {
long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
@ -754,6 +792,14 @@ public class BucketCache implements BlockCache, HeapSize {
} }
} }
// Even after the above free we might still need freeing because of the
// De-fragmentation of the buckets (also called Slab Calcification problem), i.e
// there might be some buckets where the occupancy is very sparse and thus are not
// yielding the free for the other bucket sizes, the fix for this to evict some
// of the buckets, we do this by evicting the buckets that are least fulled
freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR *
bucketSizesAboveThresholdCount(1.0f));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long single = bucketSingle.totalSize(); long single = bucketSingle.totalSize();
long multi = bucketMulti.totalSize(); long multi = bucketMulti.totalSize();