HBASE-17819 Reduce the heap overhead for BucketCache.
This commit is contained in:
parent
c8c670b561
commit
b5881dbd3f
|
@ -37,7 +37,7 @@ public final class UnsafeAccess {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UnsafeAccess.class);
|
||||
|
||||
static final Unsafe theUnsafe;
|
||||
public static final Unsafe theUnsafe;
|
||||
|
||||
/** The offset to the first element in a byte array. */
|
||||
public static final long BYTE_ARRAY_BASE_OFFSET;
|
||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.IdReadWriteLock;
|
||||
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
|
||||
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -517,7 +518,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
cacheStats.ioHit(timeTaken);
|
||||
}
|
||||
if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
|
||||
bucketEntry.refCount.incrementAndGet();
|
||||
bucketEntry.incrementRefCountAndGet();
|
||||
}
|
||||
bucketEntry.access(accessCount.incrementAndGet());
|
||||
if (this.ioErrorStartTime > 0) {
|
||||
|
@ -610,8 +611,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
|
||||
try {
|
||||
lock.writeLock().lock();
|
||||
int refCount = bucketEntry.refCount.get();
|
||||
if(refCount == 0) {
|
||||
int refCount = bucketEntry.getRefCount();
|
||||
if (refCount == 0) {
|
||||
if (backingMap.remove(cacheKey, bucketEntry)) {
|
||||
blockEvicted(cacheKey, bucketEntry, removedBlock == null);
|
||||
} else {
|
||||
|
@ -630,7 +631,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
+ " readers. Can not be freed now. Hence will mark this"
|
||||
+ " for evicting at a later point");
|
||||
}
|
||||
bucketEntry.markedForEvict = true;
|
||||
bucketEntry.markForEvict();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -728,7 +729,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// this set is small around O(Handler Count) unless something else is wrong
|
||||
Set<Integer> inUseBuckets = new HashSet<Integer>();
|
||||
for (BucketEntry entry : backingMap.values()) {
|
||||
if (entry.refCount.get() != 0) {
|
||||
if (entry.getRefCount() != 0) {
|
||||
inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
|
||||
}
|
||||
}
|
||||
|
@ -1275,9 +1276,6 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
byte deserialiserIndex;
|
||||
private volatile long accessCounter;
|
||||
private BlockPriority priority;
|
||||
// Set this when we were not able to forcefully evict the block
|
||||
private volatile boolean markedForEvict;
|
||||
private AtomicInteger refCount = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* Time this block was cached. Presumes we are created just before we are added to the cache.
|
||||
|
@ -1342,6 +1340,63 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
public long getCachedTime() {
|
||||
return cachedTime;
|
||||
}
|
||||
|
||||
protected int getRefCount() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected int incrementRefCountAndGet() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected int decrementRefCountAndGet() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean isMarkedForEvict() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void markForEvict() {
|
||||
// noop;
|
||||
}
|
||||
}
|
||||
|
||||
static class SharedMemoryBucketEntry extends BucketEntry {
|
||||
private static final long serialVersionUID = -2187147283772338481L;
|
||||
|
||||
// Set this when we were not able to forcefully evict the block
|
||||
private volatile boolean markedForEvict;
|
||||
private AtomicInteger refCount = new AtomicInteger(0);
|
||||
|
||||
SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
|
||||
super(offset, length, accessCounter, inMemory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getRefCount() {
|
||||
return this.refCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int incrementRefCountAndGet() {
|
||||
return this.refCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int decrementRefCountAndGet() {
|
||||
return this.refCount.decrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isMarkedForEvict() {
|
||||
return this.markedForEvict;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void markForEvict() {
|
||||
this.markedForEvict = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1431,7 +1486,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// This cacheable thing can't be serialized
|
||||
if (len == 0) return null;
|
||||
long offset = bucketAllocator.allocateBlock(len);
|
||||
BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
BucketEntry bucketEntry = ioEngine.usesSharedMemory()
|
||||
? UnsafeAvailChecker.isAvailable()
|
||||
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
|
||||
: new BucketEntry(offset, len, accessCounter, inMemory);
|
||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||
try {
|
||||
if (data instanceof HFileBlock) {
|
||||
|
@ -1573,8 +1632,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
if (block.getMemoryType() == MemoryType.SHARED) {
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry != null) {
|
||||
int refCount = bucketEntry.refCount.decrementAndGet();
|
||||
if (bucketEntry.markedForEvict && refCount == 0) {
|
||||
int refCount = bucketEntry.decrementRefCountAndGet();
|
||||
if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
|
||||
forceEvict(cacheKey);
|
||||
}
|
||||
}
|
||||
|
@ -1585,7 +1644,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
public int getRefCount(BlockCacheKey cacheKey) {
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry != null) {
|
||||
return bucketEntry.refCount.get();
|
||||
return bucketEntry.getRefCount();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -102,6 +102,11 @@ public class ByteBufferIOEngine implements IOEngine {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean usesSharedMemory() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
|
||||
throws IOException {
|
||||
|
|
|
@ -37,6 +37,15 @@ public interface IOEngine {
|
|||
*/
|
||||
boolean isPersistent();
|
||||
|
||||
/**
|
||||
* IOEngine uses shared memory means, when reading Cacheable from it, those refers to the same
|
||||
* memory area as used by the Engine for caching it.
|
||||
* @return true when IOEngine using shared memory.
|
||||
*/
|
||||
default boolean usesSharedMemory() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transfers data from IOEngine to a Cacheable object.
|
||||
* @param length How many bytes to be read from the offset
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
||||
import org.apache.hadoop.hbase.util.UnsafeAccess;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class UnsafeSharedMemoryBucketEntry extends BucketEntry {
|
||||
private static final long serialVersionUID = 707544024564058801L;
|
||||
|
||||
// We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet.
|
||||
// We are avoiding the need to have a field of AtomicIneger type and have it as just int type.
|
||||
// We would like to reduce the head overhead per object of this type as much as possible.
|
||||
// Doing this direct Unsafe usage save us 16 bytes per Object.
|
||||
// ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes)
|
||||
// and 4 bytes reference to it.
|
||||
private static final Unsafe unsafe = UnsafeAccess.theUnsafe;
|
||||
private static final long refCountOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
refCountOffset = unsafe
|
||||
.objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount"));
|
||||
} catch (Exception ex) {
|
||||
throw new Error(ex);
|
||||
}
|
||||
}
|
||||
|
||||
// Set this when we were not able to forcefully evict the block
|
||||
private volatile boolean markedForEvict;
|
||||
private volatile int refCount = 0;
|
||||
|
||||
public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter,
|
||||
boolean inMemory) {
|
||||
super(offset, length, accessCounter, inMemory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getRefCount() {
|
||||
return this.refCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int incrementRefCountAndGet() {
|
||||
return unsafe.getAndAddInt(this, refCountOffset, 1) + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int decrementRefCountAndGet() {
|
||||
return unsafe.getAndAddInt(this, refCountOffset, -1) - 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isMarkedForEvict() {
|
||||
return this.markedForEvict;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void markForEvict() {
|
||||
this.markedForEvict = true;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue