HBASE-17819 Reduce the heap overhead for BucketCache.

This commit is contained in:
anoopsamjohn 2018-03-25 16:36:30 +05:30
parent 9ea1a7d422
commit 3f7222df36
5 changed files with 167 additions and 13 deletions

View File

@ -37,7 +37,7 @@ public final class UnsafeAccess {
private static final Logger LOG = LoggerFactory.getLogger(UnsafeAccess.class); private static final Logger LOG = LoggerFactory.getLogger(UnsafeAccess.class);
static final Unsafe theUnsafe; public static final Unsafe theUnsafe;
/** The offset to the first element in a byte array. */ /** The offset to the first element in a byte array. */
public static final long BYTE_ARRAY_BASE_OFFSET; public static final long BYTE_ARRAY_BASE_OFFSET;

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -517,7 +518,7 @@ public class BucketCache implements BlockCache, HeapSize {
cacheStats.ioHit(timeTaken); cacheStats.ioHit(timeTaken);
} }
if (cachedBlock.getMemoryType() == MemoryType.SHARED) { if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
bucketEntry.refCount.incrementAndGet(); bucketEntry.incrementRefCountAndGet();
} }
bucketEntry.access(accessCount.incrementAndGet()); bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) { if (this.ioErrorStartTime > 0) {
@ -610,8 +611,8 @@ public class BucketCache implements BlockCache, HeapSize {
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
try { try {
lock.writeLock().lock(); lock.writeLock().lock();
int refCount = bucketEntry.refCount.get(); int refCount = bucketEntry.getRefCount();
if(refCount == 0) { if (refCount == 0) {
if (backingMap.remove(cacheKey, bucketEntry)) { if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, removedBlock == null); blockEvicted(cacheKey, bucketEntry, removedBlock == null);
} else { } else {
@ -630,7 +631,7 @@ public class BucketCache implements BlockCache, HeapSize {
+ " readers. Can not be freed now. Hence will mark this" + " readers. Can not be freed now. Hence will mark this"
+ " for evicting at a later point"); + " for evicting at a later point");
} }
bucketEntry.markedForEvict = true; bucketEntry.markForEvict();
} }
} }
} finally { } finally {
@ -728,7 +729,7 @@ public class BucketCache implements BlockCache, HeapSize {
// this set is small around O(Handler Count) unless something else is wrong // this set is small around O(Handler Count) unless something else is wrong
Set<Integer> inUseBuckets = new HashSet<Integer>(); Set<Integer> inUseBuckets = new HashSet<Integer>();
for (BucketEntry entry : backingMap.values()) { for (BucketEntry entry : backingMap.values()) {
if (entry.refCount.get() != 0) { if (entry.getRefCount() != 0) {
inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
} }
} }
@ -1275,9 +1276,6 @@ public class BucketCache implements BlockCache, HeapSize {
byte deserialiserIndex; byte deserialiserIndex;
private volatile long accessCounter; private volatile long accessCounter;
private BlockPriority priority; private BlockPriority priority;
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private AtomicInteger refCount = new AtomicInteger(0);
/** /**
* Time this block was cached. Presumes we are created just before we are added to the cache. * Time this block was cached. Presumes we are created just before we are added to the cache.
@ -1342,6 +1340,63 @@ public class BucketCache implements BlockCache, HeapSize {
public long getCachedTime() { public long getCachedTime() {
return cachedTime; return cachedTime;
} }
protected int getRefCount() {
return 0;
}
protected int incrementRefCountAndGet() {
return 0;
}
protected int decrementRefCountAndGet() {
return 0;
}
protected boolean isMarkedForEvict() {
return false;
}
protected void markForEvict() {
// noop;
}
}
static class SharedMemoryBucketEntry extends BucketEntry {
private static final long serialVersionUID = -2187147283772338481L;
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private AtomicInteger refCount = new AtomicInteger(0);
SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
super(offset, length, accessCounter, inMemory);
}
@Override
protected int getRefCount() {
return this.refCount.get();
}
@Override
protected int incrementRefCountAndGet() {
return this.refCount.incrementAndGet();
}
@Override
protected int decrementRefCountAndGet() {
return this.refCount.decrementAndGet();
}
@Override
protected boolean isMarkedForEvict() {
return this.markedForEvict;
}
@Override
protected void markForEvict() {
this.markedForEvict = true;
}
} }
/** /**
@ -1431,7 +1486,11 @@ public class BucketCache implements BlockCache, HeapSize {
// This cacheable thing can't be serialized // This cacheable thing can't be serialized
if (len == 0) return null; if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len); long offset = bucketAllocator.allocateBlock(len);
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); BucketEntry bucketEntry = ioEngine.usesSharedMemory()
? UnsafeAvailChecker.isAvailable()
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
: new BucketEntry(offset, len, accessCounter, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try { try {
if (data instanceof HFileBlock) { if (data instanceof HFileBlock) {
@ -1573,8 +1632,8 @@ public class BucketCache implements BlockCache, HeapSize {
if (block.getMemoryType() == MemoryType.SHARED) { if (block.getMemoryType() == MemoryType.SHARED) {
BucketEntry bucketEntry = backingMap.get(cacheKey); BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) { if (bucketEntry != null) {
int refCount = bucketEntry.refCount.decrementAndGet(); int refCount = bucketEntry.decrementRefCountAndGet();
if (bucketEntry.markedForEvict && refCount == 0) { if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
forceEvict(cacheKey); forceEvict(cacheKey);
} }
} }
@ -1585,7 +1644,7 @@ public class BucketCache implements BlockCache, HeapSize {
public int getRefCount(BlockCacheKey cacheKey) { public int getRefCount(BlockCacheKey cacheKey) {
BucketEntry bucketEntry = backingMap.get(cacheKey); BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) { if (bucketEntry != null) {
return bucketEntry.refCount.get(); return bucketEntry.getRefCount();
} }
return 0; return 0;
} }

View File

@ -102,6 +102,11 @@ public class ByteBufferIOEngine implements IOEngine {
return false; return false;
} }
@Override
public boolean usesSharedMemory() {
return true;
}
@Override @Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer) public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException { throws IOException {

View File

@ -37,6 +37,15 @@ public interface IOEngine {
*/ */
boolean isPersistent(); boolean isPersistent();
/**
* IOEngine uses shared memory means, when reading Cacheable from it, those refers to the same
* memory area as used by the Engine for caching it.
* @return true when IOEngine using shared memory.
*/
default boolean usesSharedMemory() {
return false;
}
/** /**
* Transfers data from IOEngine to a Cacheable object. * Transfers data from IOEngine to a Cacheable object.
* @param length How many bytes to be read from the offset * @param length How many bytes to be read from the offset

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.yetus.audience.InterfaceAudience;
import sun.misc.Unsafe;
@InterfaceAudience.Private
public class UnsafeSharedMemoryBucketEntry extends BucketEntry {
private static final long serialVersionUID = 707544024564058801L;
// We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet.
// We are avoiding the need to have a field of AtomicIneger type and have it as just int type.
// We would like to reduce the head overhead per object of this type as much as possible.
// Doing this direct Unsafe usage save us 16 bytes per Object.
// ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes)
// and 4 bytes reference to it.
private static final Unsafe unsafe = UnsafeAccess.theUnsafe;
private static final long refCountOffset;
static {
try {
refCountOffset = unsafe
.objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount"));
} catch (Exception ex) {
throw new Error(ex);
}
}
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private volatile int refCount = 0;
public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter,
boolean inMemory) {
super(offset, length, accessCounter, inMemory);
}
@Override
protected int getRefCount() {
return this.refCount;
}
@Override
protected int incrementRefCountAndGet() {
return unsafe.getAndAddInt(this, refCountOffset, 1) + 1;
}
@Override
protected int decrementRefCountAndGet() {
return unsafe.getAndAddInt(this, refCountOffset, -1) - 1;
}
@Override
protected boolean isMarkedForEvict() {
return this.markedForEvict;
}
@Override
protected void markForEvict() {
this.markedForEvict = true;
}
}