From 3f7222df3699ef11d0782628c8358ad5d0ce108b Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Sun, 25 Mar 2018 16:36:30 +0530 Subject: [PATCH] HBASE-17819 Reduce the heap overhead for BucketCache. --- .../hadoop/hbase/util/UnsafeAccess.java | 2 +- .../hbase/io/hfile/bucket/BucketCache.java | 83 ++++++++++++++++--- .../io/hfile/bucket/ByteBufferIOEngine.java | 5 ++ .../hbase/io/hfile/bucket/IOEngine.java | 9 ++ .../bucket/UnsafeSharedMemoryBucketEntry.java | 81 ++++++++++++++++++ 5 files changed, 167 insertions(+), 13 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index feaa9e66242..486f81beb6d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -37,7 +37,7 @@ public final class UnsafeAccess { private static final Logger LOG = LoggerFactory.getLogger(UnsafeAccess.class); - static final Unsafe theUnsafe; + public static final Unsafe theUnsafe; /** The offset to the first element in a byte array. */ public static final long BYTE_ARRAY_BASE_OFFSET; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e9129d2f5b2..673057c7949 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; +import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -517,7 +518,7 @@ public class BucketCache implements BlockCache, HeapSize { cacheStats.ioHit(timeTaken); } if (cachedBlock.getMemoryType() == MemoryType.SHARED) { - bucketEntry.refCount.incrementAndGet(); + bucketEntry.incrementRefCountAndGet(); } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { @@ -610,8 +611,8 @@ public class BucketCache implements BlockCache, HeapSize { ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); try { lock.writeLock().lock(); - int refCount = bucketEntry.refCount.get(); - if(refCount == 0) { + int refCount = bucketEntry.getRefCount(); + if (refCount == 0) { if (backingMap.remove(cacheKey, bucketEntry)) { blockEvicted(cacheKey, bucketEntry, removedBlock == null); } else { @@ -630,7 +631,7 @@ public class BucketCache implements BlockCache, HeapSize { + " readers. Can not be freed now. Hence will mark this" + " for evicting at a later point"); } - bucketEntry.markedForEvict = true; + bucketEntry.markForEvict(); } } } finally { @@ -728,7 +729,7 @@ public class BucketCache implements BlockCache, HeapSize { // this set is small around O(Handler Count) unless something else is wrong Set inUseBuckets = new HashSet(); for (BucketEntry entry : backingMap.values()) { - if (entry.refCount.get() != 0) { + if (entry.getRefCount() != 0) { inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); } } @@ -1275,9 +1276,6 @@ public class BucketCache implements BlockCache, HeapSize { byte deserialiserIndex; private volatile long accessCounter; private BlockPriority priority; - // Set this when we were not able to forcefully evict the block - private volatile boolean markedForEvict; - private AtomicInteger refCount = new AtomicInteger(0); /** * Time this block was cached. Presumes we are created just before we are added to the cache. @@ -1342,6 +1340,63 @@ public class BucketCache implements BlockCache, HeapSize { public long getCachedTime() { return cachedTime; } + + protected int getRefCount() { + return 0; + } + + protected int incrementRefCountAndGet() { + return 0; + } + + protected int decrementRefCountAndGet() { + return 0; + } + + protected boolean isMarkedForEvict() { + return false; + } + + protected void markForEvict() { + // noop; + } + } + + static class SharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = -2187147283772338481L; + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private AtomicInteger refCount = new AtomicInteger(0); + + SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + @Override + protected int getRefCount() { + return this.refCount.get(); + } + + @Override + protected int incrementRefCountAndGet() { + return this.refCount.incrementAndGet(); + } + + @Override + protected int decrementRefCountAndGet() { + return this.refCount.decrementAndGet(); + } + + @Override + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } } /** @@ -1431,7 +1486,11 @@ public class BucketCache implements BlockCache, HeapSize { // This cacheable thing can't be serialized if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); - BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); + BucketEntry bucketEntry = ioEngine.usesSharedMemory() + ? UnsafeAvailChecker.isAvailable() + ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) + : new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { @@ -1573,8 +1632,8 @@ public class BucketCache implements BlockCache, HeapSize { if (block.getMemoryType() == MemoryType.SHARED) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - int refCount = bucketEntry.refCount.decrementAndGet(); - if (bucketEntry.markedForEvict && refCount == 0) { + int refCount = bucketEntry.decrementRefCountAndGet(); + if (refCount == 0 && bucketEntry.isMarkedForEvict()) { forceEvict(cacheKey); } } @@ -1585,7 +1644,7 @@ public class BucketCache implements BlockCache, HeapSize { public int getRefCount(BlockCacheKey cacheKey) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null) { - return bucketEntry.refCount.get(); + return bucketEntry.getRefCount(); } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 9f4ffba94a3..3b832fe3974 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -102,6 +102,11 @@ public class ByteBufferIOEngine implements IOEngine { return false; } + @Override + public boolean usesSharedMemory() { + return true; + } + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 61bde6b29c1..87f71a52213 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -37,6 +37,15 @@ public interface IOEngine { */ boolean isPersistent(); + /** + * IOEngine uses shared memory means, when reading Cacheable from it, those refers to the same + * memory area as used by the Engine for caching it. + * @return true when IOEngine using shared memory. + */ + default boolean usesSharedMemory() { + return false; + } + /** * Transfers data from IOEngine to a Cacheable object. * @param length How many bytes to be read from the offset diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java new file mode 100644 index 00000000000..5d93e9715dc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry; +import org.apache.hadoop.hbase.util.UnsafeAccess; +import org.apache.yetus.audience.InterfaceAudience; + +import sun.misc.Unsafe; + +@InterfaceAudience.Private +public class UnsafeSharedMemoryBucketEntry extends BucketEntry { + private static final long serialVersionUID = 707544024564058801L; + + // We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet. + // We are avoiding the need to have a field of AtomicIneger type and have it as just int type. + // We would like to reduce the head overhead per object of this type as much as possible. + // Doing this direct Unsafe usage save us 16 bytes per Object. + // ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes) + // and 4 bytes reference to it. + private static final Unsafe unsafe = UnsafeAccess.theUnsafe; + private static final long refCountOffset; + + static { + try { + refCountOffset = unsafe + .objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount")); + } catch (Exception ex) { + throw new Error(ex); + } + } + + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private volatile int refCount = 0; + + public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter, + boolean inMemory) { + super(offset, length, accessCounter, inMemory); + } + + @Override + protected int getRefCount() { + return this.refCount; + } + + @Override + protected int incrementRefCountAndGet() { + return unsafe.getAndAddInt(this, refCountOffset, 1) + 1; + } + + @Override + protected int decrementRefCountAndGet() { + return unsafe.getAndAddInt(this, refCountOffset, -1) - 1; + } + + @Override + protected boolean isMarkedForEvict() { + return this.markedForEvict; + } + + @Override + protected void markForEvict() { + this.markedForEvict = true; + } +} \ No newline at end of file