HBASE-21957 Unify refCount of BucketEntry and refCount of hbase.nio.ByteBuff into one

This commit is contained in:
huzheng 2019-04-10 16:33:30 +08:00
parent 773c0d6635
commit 48aca4db30
31 changed files with 913 additions and 701 deletions

View File

@ -22,14 +22,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
@ -39,7 +37,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
* provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
* positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
* have some additional APIs which helps us in the read path. <br/>
* The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
* The ByteBuff implement {@link HBaseReferenceCounted} interface which mean need to maintains a
* {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
* {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
* {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
@ -59,7 +57,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
* </pre>
*/
@InterfaceAudience.Private
public abstract class ByteBuff implements ReferenceCounted {
public abstract class ByteBuff implements HBaseReferenceCounted {
private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
@ -80,26 +78,6 @@ public abstract class ByteBuff implements ReferenceCounted {
return refCnt.release();
}
@Override
public final ByteBuff retain(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final boolean release(int increment) {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch() {
throw new UnsupportedOperationException();
}
@Override
public final ByteBuff touch(Object hint) {
throw new UnsupportedOperationException();
}
/******************************* Methods for ByteBuff **************************************/
/**
@ -563,31 +541,56 @@ public abstract class ByteBuff implements ReferenceCounted {
/********************************* ByteBuff wrapper methods ***********************************/
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
/**
* In theory, the upstream should never construct an ByteBuff by passing an given refCnt, so
* please don't use this public method in other place. Make the method public here because the
* BucketEntry#wrapAsCacheable in hbase-server module will use its own refCnt and ByteBuffers from
* IOEngine to composite an HFileBlock's ByteBuff, we didn't find a better way so keep the public
* way here.
*/
public static ByteBuff wrap(ByteBuffer[] buffers, RefCnt refCnt) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
: new MultiByteBuff(recycler, buffers);
return buffers.length == 1 ? new SingleByteBuff(refCnt, buffers[0])
: new MultiByteBuff(refCnt, buffers);
}
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
return wrap(buffers, RefCnt.create(recycler));
}
public static ByteBuff wrap(ByteBuffer[] buffers) {
return wrap(buffers, ByteBuffAllocator.NONE);
return wrap(buffers, RefCnt.create());
}
public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
: new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
return wrap(buffers, RefCnt.create(recycler));
}
public static ByteBuff wrap(List<ByteBuffer> buffers) {
return wrap(buffers, ByteBuffAllocator.NONE);
return wrap(buffers, RefCnt.create());
}
public static ByteBuff wrap(ByteBuffer buffer) {
return new SingleByteBuff(ByteBuffAllocator.NONE, buffer);
return wrap(buffer, RefCnt.create());
}
/**
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
*/
private static ByteBuff wrap(List<ByteBuffer> buffers, RefCnt refCnt) {
if (buffers == null || buffers.size() == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
}
return buffers.size() == 1 ? new SingleByteBuff(refCnt, buffers.get(0))
: new MultiByteBuff(refCnt, buffers.toArray(new ByteBuffer[0]));
}
/**
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
*/
private static ByteBuff wrap(ByteBuffer buffer, RefCnt refCnt) {
return new SingleByteBuff(refCnt, buffer);
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.nio;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
/**
* The HBaseReferenceCounted disabled several methods in Netty's {@link ReferenceCounted}, because
* those methods are unlikely to be used.
*/
@InterfaceAudience.Private
public interface HBaseReferenceCounted extends ReferenceCounted {
@Override
default HBaseReferenceCounted retain(int increment) {
throw new UnsupportedOperationException();
}
@Override
default boolean release(int increment) {
throw new UnsupportedOperationException();
}
@Override
default HBaseReferenceCounted touch() {
throw new UnsupportedOperationException();
}
@Override
default HBaseReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
}
}

View File

@ -61,7 +61,7 @@ public class MultiByteBuff extends ByteBuff {
this(new RefCnt(recycler), items);
}
private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
this.refCnt = refCnt;
assert items != null;
assert items.length > 0;

View File

@ -29,11 +29,25 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
* reference count become 0, it'll call {@link Recycler#free()} once.
*/
@InterfaceAudience.Private
class RefCnt extends AbstractReferenceCounted {
public class RefCnt extends AbstractReferenceCounted {
private Recycler recycler = ByteBuffAllocator.NONE;
RefCnt(Recycler recycler) {
/**
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
* zero, the recycler will do nothing. Usually, an Heap {@link ByteBuff} will use this kind of
* refCnt to track its life cycle, it help to abstract the code path although it's meaningless to
* use an refCnt for heap ByteBuff.
*/
public static RefCnt create() {
return new RefCnt(ByteBuffAllocator.NONE);
}
public static RefCnt create(Recycler recycler) {
return new RefCnt(recycler);
}
public RefCnt(Recycler recycler) {
this.recycler = recycler;
}

View File

@ -57,7 +57,7 @@ public class SingleByteBuff extends ByteBuff {
this(new RefCnt(recycler), buf);
}
private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
this.refCnt = refCnt;
this.buf = buf;
if (buf.hasArray()) {

View File

@ -192,15 +192,17 @@ public class ByteBufferArray {
}
/**
* Creates a ByteBuff from a given array of ByteBuffers from the given offset to the length
* Creates a sub-array from a given array of ByteBuffers from the given offset to the length
* specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
* asSubBuffer(5, 10) then we will create an MBB consisting of two BBs and the first one be a BB
* from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 'length' 5.
* asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first
* one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to
* 'length' 5.
* @param offset the position in the whole array which is composited by multiple byte buffers.
* @param len the length of bytes
* @return a ByteBuff formed from the underlying ByteBuffers
* @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a
* zero position.
*/
public ByteBuff asSubByteBuff(long offset, final int len) {
public ByteBuffer[] asSubByteBuffers(long offset, final int len) {
BufferIterator it = new BufferIterator(offset, len);
ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
for (int i = 0; i < mbb.length; i++) {
@ -208,7 +210,7 @@ public class ByteBufferArray {
mbb[i] = it.next();
}
assert it.getSum() == len;
return ByteBuff.wrap(mbb);
return mbb;
}
/**

View File

@ -51,7 +51,7 @@ public class TestByteBufferArray {
public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
int capacity = 4 * 1024 * 1024;
ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
ByteBuff subBuf = ByteBuff.wrap(array.asSubByteBuffers(0, capacity));
subBuf.position(capacity - 1);// Position to the last byte
assertTrue(subBuf.hasRemaining());
// Read last byte
@ -179,7 +179,7 @@ public class TestByteBufferArray {
}
private void testAsSubByteBuff(ByteBufferArray array, int off, int len, boolean isMulti) {
ByteBuff ret = array.asSubByteBuff(off, len);
ByteBuff ret = ByteBuff.wrap(array.asSubByteBuffers(off, len));
if (isMulti) {
assertTrue(ret instanceof MultiByteBuff);
} else {

View File

@ -228,8 +228,8 @@ public class BlockCacheUtil {
*/
public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
BlockCacheKey cacheKey, Cacheable newBlock) {
// NOTICE: The getBlock has retained the existingBlock inside.
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
existingBlock.retain();
try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
if (comparison < 0) {

View File

@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Cacheable is an interface that allows for an object to be cached. If using an
@ -36,7 +35,7 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
*
*/
@InterfaceAudience.Private
public interface Cacheable extends HeapSize, ReferenceCounted {
public interface Cacheable extends HeapSize, HBaseReferenceCounted {
/**
* Returns the length of the ByteBuffer required to serialized the object. If the
* object cannot be serialized, it should return 0.
@ -87,10 +86,6 @@ public interface Cacheable extends HeapSize, ReferenceCounted {
return this;
}
default Cacheable retain(int increment) {
throw new UnsupportedOperationException();
}
/**
* Reference count of this Cacheable.
*/
@ -106,16 +101,4 @@ public interface Cacheable extends HeapSize, ReferenceCounted {
default boolean release() {
return false;
}
default boolean release(int increment) {
throw new UnsupportedOperationException();
}
default ReferenceCounted touch() {
throw new UnsupportedOperationException();
}
default ReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
}
}

View File

@ -386,9 +386,10 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
}
@VisibleForTesting
public int getRefCount(BlockCacheKey cacheKey) {
public int getRpcRefCount(BlockCacheKey cacheKey) {
return (this.l2Cache instanceof BucketCache)
? ((BucketCache) this.l2Cache).getRefCount(cacheKey) : 0;
? ((BucketCache) this.l2Cache).getRpcRefCount(cacheKey)
: 0;
}
public FirstLevelBlockCache getFirstLevelCache() {

View File

@ -274,7 +274,7 @@ public class HFileBlock implements Cacheable {
newByteBuff = buf.slice();
} else {
int len = buf.limit();
newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
newByteBuff = ByteBuff.wrap(ByteBuffer.allocate(len));
newByteBuff.put(0, buf, buf.position(), len);
}
// Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
@ -323,7 +323,7 @@ public class HFileBlock implements Cacheable {
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
that.fileContext, that.allocator);
if (bufCopy) {
this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
} else {
this.buf = that.buf.duplicate();
}

View File

@ -28,7 +28,6 @@ import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -22,10 +22,8 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
@ -44,7 +42,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
@ -54,6 +51,9 @@ import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
@ -62,18 +62,15 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -82,6 +79,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
/**
@ -204,22 +202,12 @@ public class BucketCache implements BlockCache, HeapSize {
@VisibleForTesting
transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
private final NavigableSet<BlockCacheKey> blocksByHFile =
new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {
@Override
public int compare(BlockCacheKey a, BlockCacheKey b) {
private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
int nameComparison = a.getHfileName().compareTo(b.getHfileName());
if (nameComparison != 0) {
return nameComparison;
}
if (a.getOffset() == b.getOffset()) {
return 0;
} else if (a.getOffset() < b.getOffset()) {
return -1;
}
return 1;
}
return Long.compare(a.getOffset(), b.getOffset());
});
/** Statistics thread schedule pool (for heavy debugging, could remove) */
@ -249,16 +237,14 @@ public class BucketCache implements BlockCache, HeapSize {
private float memoryFactor;
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
}
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
throws FileNotFoundException, IOException {
Configuration conf) throws IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
@ -444,7 +430,8 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
createRecycler(cacheKey));
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
@ -509,21 +496,16 @@ public class BucketCache implements BlockCache, HeapSize {
// maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
// existence here.
if (bucketEntry.equals(backingMap.get(key))) {
// TODO : change this area - should be removed after server cells and
// 12295 are available
int len = bucketEntry.getLength();
if (LOG.isTraceEnabled()) {
LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
}
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
bucketEntry.deserializerReference());
long timeTaken = System.nanoTime() - start;
// Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry);
// RPC start to reference, so retain here.
cachedBlock.retain();
// Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
cacheStats.ioHit(timeTaken);
}
if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
bucketEntry.incrementRefCountAndGet();
cacheStats.ioHit(System.nanoTime() - start);
}
bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) {
@ -554,40 +536,58 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
/**
* Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
* 1. Close an HFile, and clear all cached blocks. <br>
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
* <p>
* Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
* to evict from backingMap. Here we only need to free the reference from bucket cache by calling
* {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
* only be de-allocated when all of them release the block.
* <p>
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
* {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
* @param cacheKey Block to evict
* @return true to indicate whether we've evicted successfully or not.
*/
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return evictBlock(cacheKey, true);
}
// does not check for the ref count. Just tries to evict it if found in the
// bucket map
private boolean forceEvict(BlockCacheKey cacheKey) {
if (!cacheEnabled) {
return false;
}
boolean existed = removeFromRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
BucketEntry be = backingMap.get(cacheKey);
if (be == null) {
if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
return true;
}
return existed;
} else {
return false;
return be.withWriteLock(offsetLock, be::markAsEvicted);
}
}
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
try {
lock.writeLock().lock();
if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, !existed);
} else {
return false;
private Recycler createRecycler(BlockCacheKey cacheKey) {
return () -> {
if (!cacheEnabled) {
return;
}
} finally {
lock.writeLock().unlock();
boolean existed = removeFromRamCache(cacheKey);
BucketEntry be = backingMap.get(cacheKey);
if (be == null && existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
} else if (be != null) {
be.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, be)) {
blockEvicted(cacheKey, be, !existed);
cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
}
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
return true;
return null;
});
}
};
}
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
@ -599,53 +599,6 @@ public class BucketCache implements BlockCache, HeapSize {
});
}
public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
if (!cacheEnabled) {
return false;
}
boolean existed = removeFromRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
return true;
} else {
return false;
}
}
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
try {
lock.writeLock().lock();
int refCount = bucketEntry.getRefCount();
if (refCount == 0) {
if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, !existed);
} else {
return false;
}
} else {
if(!deletedBlock) {
if (LOG.isDebugEnabled()) {
LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+ " readers. Can not be freed now");
}
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+ " readers. Can not be freed now. Hence will mark this"
+ " for evicting at a later point");
}
bucketEntry.markForEvict();
}
}
} finally {
lock.writeLock().unlock();
}
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
return true;
}
/*
* Statistics thread. Periodically output cache statistics to the log.
*/
@ -732,19 +685,17 @@ public class BucketCache implements BlockCache, HeapSize {
if (completelyFreeBucketsNeeded != 0) {
// First we will build a set where the offsets are reference counted, usually
// this set is small around O(Handler Count) unless something else is wrong
Set<Integer> inUseBuckets = new HashSet<Integer>();
for (BucketEntry entry : backingMap.values()) {
if (entry.getRefCount() != 0) {
inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
Set<Integer> inUseBuckets = new HashSet<>();
backingMap.forEach((k, be) -> {
if (be.isRpcRef()) {
inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
}
}
Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
inUseBuckets, completelyFreeBucketsNeeded);
});
Set<Integer> candidateBuckets =
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
if (candidateBuckets.contains(bucketAllocator
.getBucketIndex(entry.getValue().offset()))) {
evictBlock(entry.getKey(), false);
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
}
}
}
@ -921,7 +872,9 @@ public class BucketCache implements BlockCache, HeapSize {
// Blocks
entries = getRAMQueueEntries(inputQueue, entries);
} catch (InterruptedException ie) {
if (!cacheEnabled) break;
if (!cacheEnabled || !writerEnabled) {
break;
}
}
doDrain(entries);
} catch (Exception ioe) {
@ -949,13 +902,10 @@ public class BucketCache implements BlockCache, HeapSize {
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
lock.writeLock().lock();
try {
previousEntry.withWriteLock(offsetLock, () -> {
blockEvicted(key, previousEntry, false);
} finally {
lock.writeLock().unlock();
}
return null;
});
}
}
@ -1049,22 +999,13 @@ public class BucketCache implements BlockCache, HeapSize {
});
if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
try {
lock.writeLock().lock();
int refCount = bucketEntries[i].getRefCount();
if (refCount == 0) {
if (backingMap.remove(key, bucketEntries[i])) {
blockEvicted(key, bucketEntries[i], false);
} else {
bucketEntries[i].markForEvict();
}
} else {
bucketEntries[i].markForEvict();
}
} finally {
lock.writeLock().unlock();
final BucketEntry bucketEntry = bucketEntries[i];
bucketEntry.withWriteLock(offsetLock, () -> {
if (backingMap.remove(key, bucketEntry)) {
blockEvicted(key, bucketEntry, false);
}
return null;
});
}
}
@ -1077,17 +1018,16 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
* Blocks until elements available in {@code q} then tries to grab as many as possible
* before returning.
* @param receptacle Where to stash the elements taken from queue. We clear before we use it
* just in case.
* Blocks until elements available in {@code q} then tries to grab as many as possible before
* returning.
* @param receptacle Where to stash the elements taken from queue. We clear before we use it just
* in case.
* @param q The queue to take from.
* @return {@code receptacle} laden with elements taken from the queue or empty if none found.
*/
@VisibleForTesting
static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
final List<RAMQueueEntry> receptacle)
throws InterruptedException {
static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
List<RAMQueueEntry> receptacle) throws InterruptedException {
// Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
// ok even if list grew to accommodate thousands.
receptacle.clear();
@ -1313,155 +1253,6 @@ public class BucketCache implements BlockCache, HeapSize {
return numEvicted;
}
/**
* Item in cache. We expect this to be where most memory goes. Java uses 8
* bytes just for object headers; after this, we want to use as little as
* possible - so we only use 8 bytes, but in order to do so we end up messing
* around with all this Java casting stuff. Offset stored as 5 bytes that make
* up the long. Doubt we'll see devices this big for ages. Offsets are divided
* by 256. So 5 bytes gives us 256TB or so.
*/
static class BucketEntry implements Serializable {
private static final long serialVersionUID = -6741504807982257534L;
// access counter comparator, descending order
static final Comparator<BucketEntry> COMPARATOR = Comparator
.comparingLong(BucketEntry::getAccessCounter).reversed();
private int offsetBase;
private int length;
private byte offset1;
/**
* The index of the deserializer that can deserialize this BucketEntry content.
* See {@link CacheableDeserializerIdManager} for hosting of index to serializers.
*/
byte deserialiserIndex;
private volatile long accessCounter;
private BlockPriority priority;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
*/
private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
if (inMemory) {
this.priority = BlockPriority.MEMORY;
} else {
this.priority = BlockPriority.SINGLE;
}
}
long offset() { // Java has no unsigned numbers
long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number.
o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int.
return o << 8;
}
private void setOffset(long value) {
assert (value & 0xFF) == 0;
value >>= 8;
offsetBase = (int) value;
offset1 = (byte) (value >> 32);
}
public int getLength() {
return length;
}
protected CacheableDeserializer<Cacheable> deserializerReference() {
return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
}
protected void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
}
public long getAccessCounter() {
return accessCounter;
}
/**
* Block has been accessed. Update its local access counter.
*/
public void access(long accessCounter) {
this.accessCounter = accessCounter;
if (this.priority == BlockPriority.SINGLE) {
this.priority = BlockPriority.MULTI;
}
}
public BlockPriority getPriority() {
return this.priority;
}
public long getCachedTime() {
return cachedTime;
}
protected int getRefCount() {
return 0;
}
protected int incrementRefCountAndGet() {
return 0;
}
protected int decrementRefCountAndGet() {
return 0;
}
protected boolean isMarkedForEvict() {
return false;
}
protected void markForEvict() {
// noop;
}
}
static class SharedMemoryBucketEntry extends BucketEntry {
private static final long serialVersionUID = -2187147283772338481L;
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private AtomicInteger refCount = new AtomicInteger(0);
SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
super(offset, length, accessCounter, inMemory);
}
@Override
protected int getRefCount() {
return this.refCount.get();
}
@Override
protected int incrementRefCountAndGet() {
return this.refCount.incrementAndGet();
}
@Override
protected int decrementRefCountAndGet() {
return this.refCount.decrementAndGet();
}
@Override
protected boolean isMarkedForEvict() {
return this.markedForEvict;
}
@Override
protected void markForEvict() {
this.markedForEvict = true;
}
}
/**
* Used to group bucket entries into priority buckets. There will be a
* BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
@ -1491,8 +1282,9 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
while ((entry = queue.pollLast()) != null) {
if (evictBlock(entry.getKey(), false)) {
freedBytes += entry.getValue().getLength();
BucketEntry be = entry.getValue();
if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
freedBytes += be.getLength();
}
if (freedBytes >= toFree) {
return freedBytes;
@ -1515,17 +1307,19 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@VisibleForTesting
static class RAMQueueEntry {
private BlockCacheKey key;
private Cacheable data;
private final BlockCacheKey key;
private final Cacheable data;
private long accessCounter;
private boolean inMemory;
private final Recycler recycler;
public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory) {
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
Recycler recycler) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
this.recycler = recycler;
}
public Cacheable getData() {
@ -1540,30 +1334,19 @@ public class BucketCache implements BlockCache, HeapSize {
this.accessCounter = accessCounter;
}
private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
if (ioEngine.usesSharedMemory()) {
if (UnsafeAvailChecker.isAvailable()) {
return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
} else {
return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
}
} else {
return new BucketEntry(offset, len, accessCounter, inMemory);
}
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
return null;
}
long offset = bucketAllocator.allocateBlock(len);
long offset = alloc.allocateBlock(len);
boolean succ = false;
BucketEntry bucketEntry;
BucketEntry bucketEntry = null;
try {
bucketEntry = getBucketEntry(ioEngine, offset, len);
bucketEntry =
new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler));
bucketEntry.setDeserialiserReference(data.getDeserializer());
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
@ -1581,7 +1364,7 @@ public class BucketCache implements BlockCache, HeapSize {
succ = true;
} finally {
if (!succ) {
bucketAllocator.freeBlock(offset);
alloc.freeBlock(offset);
}
}
realCacheSize.add(len);
@ -1697,25 +1480,11 @@ public class BucketCache implements BlockCache, HeapSize {
return null;
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
block.release();
if (block.getMemoryType() == MemoryType.SHARED) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) {
int refCount = bucketEntry.decrementRefCountAndGet();
if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
evictBlock(cacheKey);
}
}
}
}
@VisibleForTesting
public int getRefCount(BlockCacheKey cacheKey) {
public int getRpcRefCount(BlockCacheKey cacheKey) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) {
return bucketEntry.getRefCount();
return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
}
return 0;
}

View File

@ -0,0 +1,239 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object
* headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order
* to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that
* make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5
* bytes gives us 256TB or so.
*/
@InterfaceAudience.Private
class BucketEntry implements HBaseReferenceCounted {
// access counter comparator, descending order
static final Comparator<BucketEntry> COMPARATOR =
Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
private int offsetBase;
private int length;
private byte offset1;
/**
* The index of the deserializer that can deserialize this BucketEntry content. See
* {@link CacheableDeserializerIdManager} for hosting of index to serializers.
*/
byte deserialiserIndex;
private volatile long accessCounter;
private BlockPriority priority;
/**
* The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
* considering as one path, the {@link BucketCache#backingMap} reference is also considered a
* path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
* the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
* will increase or decrease as the following: <br>
* 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
* refCnt ++; <br>
* 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
* usually happen when HFile is closing or someone call the clearBucketCache by force. <br>
* 3. The read RPC path start to refer the block which is backend by the memory area in
* bucketEntry, then refCnt ++ ; <br>
* 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br>
* Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
*/
private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
this(offset, length, accessCounter, inMemory, RefCnt.create());
}
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt) {
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
this.refCnt = refCnt;
this.markedAsEvicted = new AtomicBoolean(false);
}
long offset() {
// Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended
// as a negative number.
long o = ((long) offsetBase) & 0xFFFFFFFFL;
// The 0xFF here does not need the L cast because it is treated as a positive int.
o += (((long) (offset1)) & 0xFF) << 32;
return o << 8;
}
private void setOffset(long value) {
assert (value & 0xFF) == 0;
value >>= 8;
offsetBase = (int) value;
offset1 = (byte) (value >> 32);
}
public int getLength() {
return length;
}
CacheableDeserializer<Cacheable> deserializerReference() {
return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
}
void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
}
long getAccessCounter() {
return accessCounter;
}
/**
* Block has been accessed. Update its local access counter.
*/
void access(long accessCounter) {
this.accessCounter = accessCounter;
if (this.priority == BlockPriority.SINGLE) {
this.priority = BlockPriority.MULTI;
}
}
public BlockPriority getPriority() {
return this.priority;
}
long getCachedTime() {
return cachedTime;
}
/**
* The {@link BucketCache} will try to release its reference to this BucketEntry many times. we
* must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance,
* then for RPC memory leak happen.
* @return true if we deallocate this entry successfully.
*/
boolean markAsEvicted() {
if (markedAsEvicted.compareAndSet(false, true)) {
return this.release();
}
return false;
}
/**
* Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
* the max acceptable size.
* @return true if we deallocate this entry successfully.
*/
boolean markStaleAsEvicted() {
if (!markedAsEvicted.get() && this.refCnt() == 1) {
// The only reference was coming from backingMap, now release the stale entry.
return this.markAsEvicted();
}
return false;
}
/**
* Check whether have some RPC patch referring this block. There're two case: <br>
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
* 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
* released its reference, the remaining reference can only be from RPC path. <br>
* We use this check to decide whether we can free the block area: when cached size exceed the
* acceptable size, our eviction policy will choose those stale blocks without any RPC reference
* and the RPC referred block will be excluded.
* @return true to indicate there're some RPC referring the block.
*/
boolean isRpcRef() {
boolean evicted = markedAsEvicted.get();
return this.refCnt() > 1 || (evicted && refCnt() == 1);
}
Cacheable wrapAsCacheable(ByteBuffer[] buffers, MemoryType memoryType) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return this.deserializerReference().deserialize(buf, true, memoryType);
}
interface BucketEntryHandler<T> {
T handle();
}
<T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) {
ReentrantReadWriteLock lock = offsetLock.getLock(this.offset());
try {
lock.writeLock().lock();
return handler.handle();
} finally {
lock.writeLock().unlock();
}
}
@Override
public int refCnt() {
return this.refCnt.refCnt();
}
@Override
public BucketEntry retain() {
refCnt.retain();
return this;
}
/**
* We've three cases to release refCnt now: <br>
* 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're
* closing file or clear the bucket cache or some corruption happen. when all rpc references gone,
* then free the area in bucketAllocator. <br>
* 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap
* also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the
* area in bucketAllocator. <br>
* 3.evict those block without any rpc reference if cache size exceeded. we'll only free those
* blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do.
* @return true to indicate we've decreased to zero and do the de-allocation.
*/
@Override
public boolean release() {
return refCnt.release();
}
}

View File

@ -50,9 +50,9 @@ final class BucketProtoUtils {
}
private static BucketCacheProtos.BackingMap toPB(
Map<BlockCacheKey, BucketCache.BucketEntry> backingMap) {
Map<BlockCacheKey, BucketEntry> backingMap) {
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
for (Map.Entry<BlockCacheKey, BucketCache.BucketEntry> entry : backingMap.entrySet()) {
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
.setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue()))
@ -101,7 +101,7 @@ final class BucketProtoUtils {
}
}
private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) {
private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
return BucketCacheProtos.BucketEntry.newBuilder()
.setOffset(entry.offset())
.setLength(entry.getLength())
@ -124,16 +124,16 @@ final class BucketProtoUtils {
}
}
static ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> fromPB(
static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> result = new ConcurrentHashMap<>();
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
BucketCache.BucketEntry value = new BucketCache.BucketEntry(
BucketEntry value = new BucketEntry(
protoValue.getOffset(),
protoValue.getLength(),
protoValue.getAccessCounter(),

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
@ -100,16 +99,15 @@ public class ByteBufferIOEngine implements IOEngine {
}
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);
public Cacheable read(BucketEntry be) throws IOException {
ByteBuffer[] buffers = bufferArray.asSubByteBuffers(be.offset(), be.getLength());
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED);
return be.wrapAsCacheable(buffers, MemoryType.SHARED);
}
/**

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.Comparator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
@ -43,6 +41,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue
@InterfaceAudience.Private
public class CachedEntryQueue {
private static final Comparator<Map.Entry<BlockCacheKey, BucketEntry>> COMPARATOR =
(a, b) -> BucketEntry.COMPARATOR.compare(a.getValue(), b.getValue());
private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
private long cacheSize;
@ -57,15 +58,7 @@ public class CachedEntryQueue {
if (initialSize == 0) {
initialSize++;
}
queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
@Override
public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
Entry<BlockCacheKey, BucketEntry> entry2) {
return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
}
}).expectedSize(initialSize).create();
queue = MinMaxPriorityQueue.orderedBy(COMPARATOR).expectedSize(initialSize).create();
cacheSize = 0;
this.maxSize = maxSize;
}
@ -112,12 +105,4 @@ public class CachedEntryQueue {
public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
return queue.pollLast();
}
/**
* Total size of all elements in this queue.
* @return size of all elements currently in queue, in bytes
*/
public long cacheSize() {
return cacheSize;
}
}

View File

@ -16,19 +16,16 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
* IO engine that stores data to a file on the local block device using memory mapping
* mechanism
* IO engine that stores data to a file on the local block device using memory mapping mechanism
*/
@InterfaceAudience.Private
public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@ -38,10 +35,10 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
}
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
ByteBuff dst = HEAP.allocate(length);
bufferArray.read(offset, dst);
return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE);
public Cacheable read(BucketEntry be) throws IOException {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers(), MemoryType.EXCLUSIVE);
}
}

View File

@ -27,11 +27,11 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -121,30 +121,29 @@ public class FileIOEngine implements IOEngine {
/**
* Transfers data from file to the given byte buffer
* @param offset The offset in the file where the first byte to be read
* @param length The length of buffer that should be allocated for reading
* from the file channel
* @return number of bytes read
* @throws IOException
* @param be an {@link BucketEntry} which maintains an (offset, len, refCnt)
* @return the {@link Cacheable} with block data inside.
* @throws IOException if any IO error happen.
*/
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
public Cacheable read(BucketEntry be) throws IOException {
long offset = be.offset();
int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
if (length != 0) {
accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
// this buffer from the file the data is already copied and there is no need to ensure that
// the results are not corrupted before consuming them.
// Hence in this case there is no shared memory that we point to. Even if the BucketCache
// evicts this buffer from the file the data is already copied and there is no need to
// ensure that the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) {
throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+ " expected");
throw new IllegalArgumentIOException(
"Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
}
}
dstBuffer.rewind();
return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }, MemoryType.EXCLUSIVE);
}
@VisibleForTesting

View File

@ -24,7 +24,6 @@ import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
@ -101,8 +100,7 @@ public abstract class FileMmapIOEngine implements IOEngine {
}
@Override
public abstract Cacheable read(long offset, int length,
CacheableDeserializer<Cacheable> deserializer) throws IOException;
public abstract Cacheable read(BucketEntry be) throws IOException;
/**
* Transfers data from the given byte buffer to file

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
/**
@ -48,15 +47,12 @@ public interface IOEngine {
/**
* Transfers data from IOEngine to a Cacheable object.
* @param length How many bytes to be read from the offset
* @param offset The offset in the IO engine where the first byte to be read
* @param deserializer The deserializer to be used to make a Cacheable from the data.
* @return Cacheable
* @throws IOException
* @throws RuntimeException when the length of the ByteBuff read is less than 'len'
* @param be maintains an (offset,len,refCnt) inside.
* @return Cacheable which will wrap the NIO ByteBuffers from IOEngine.
* @throws IOException when any IO error happen
* @throws IllegalArgumentException when the length of the ByteBuff read is less than 'len'
*/
Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException;
Cacheable read(BucketEntry be) throws IOException;
/**
* Transfers data from the given byte buffer to IOEngine

View File

@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -50,15 +49,14 @@ public class SharedMemoryMmapIOEngine extends FileMmapIOEngine {
}
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);
public Cacheable read(BucketEntry be) throws IOException {
ByteBuffer[] buffers = bufferArray.asSubByteBuffers(be.offset(), be.getLength());
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED);
return be.wrapAsCacheable(buffers, MemoryType.SHARED);
}
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.yetus.audience.InterfaceAudience;
import sun.misc.Unsafe;
@InterfaceAudience.Private
public class UnsafeSharedMemoryBucketEntry extends BucketEntry {
private static final long serialVersionUID = 707544024564058801L;
// We are just doing what AtomicInteger doing for the Atomic incrementAndGet/decrementAndGet.
// We are avoiding the need to have a field of AtomicIneger type and have it as just int type.
// We would like to reduce the head overhead per object of this type as much as possible.
// Doing this direct Unsafe usage save us 16 bytes per Object.
// ie Just using 4 bytes for int type than 20 bytes requirement for an AtomicInteger (16 bytes)
// and 4 bytes reference to it.
private static final Unsafe unsafe = UnsafeAccess.theUnsafe;
private static final long refCountOffset;
static {
try {
refCountOffset = unsafe
.objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount"));
} catch (Exception ex) {
throw new Error(ex);
}
}
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private volatile int refCount = 0;
public UnsafeSharedMemoryBucketEntry(long offset, int length, long accessCounter,
boolean inMemory) {
super(offset, length, accessCounter, inMemory);
}
@Override
protected int getRefCount() {
return this.refCount;
}
@Override
protected int incrementRefCountAndGet() {
return unsafe.getAndAddInt(this, refCountOffset, 1) + 1;
}
@Override
protected int decrementRefCountAndGet() {
return unsafe.getAndAddInt(this, refCountOffset, -1) - 1;
}
@Override
protected boolean isMarkedForEvict() {
return this.markedForEvict;
}
@Override
protected void markForEvict() {
this.markedForEvict = true;
}
}

View File

@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -441,9 +440,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -536,9 +535,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -670,9 +669,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -758,9 +757,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -925,9 +924,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -952,9 +951,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1043,9 +1042,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1079,9 +1078,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1160,9 +1159,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1186,9 +1185,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1214,9 +1213,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1293,9 +1292,9 @@ public class TestBlockEvictionFromClient {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
if (cache instanceof BucketCache) {
refCount = ((BucketCache) cache).getRefCount(cacheKey);
refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
} else if (cache instanceof CombinedBlockCache) {
refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
} else {
continue;
}
@ -1562,8 +1561,6 @@ public class TestBlockEvictionFromClient {
}
public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
static final AtomicInteger countOfGets = new AtomicInteger(0);
static final AtomicBoolean waitForGets = new AtomicBoolean(false);

View File

@ -170,16 +170,15 @@ public class CacheTestUtils {
}
public static void hammerSingleKey(final BlockCache toBeTested,
int BlockSize, int numThreads, int numQueries) throws Exception {
public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
throws Exception {
final BlockCacheKey key = new BlockCacheKey("key", 0);
final byte[] buf = new byte[5 * 1024];
Arrays.fill(buf, (byte) 5);
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
final AtomicInteger totalQueries = new AtomicInteger();
toBeTested.cacheBlock(key, bac);
@ -188,8 +187,8 @@ public class CacheTestUtils {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
.getBlock(key, false, false, true);
ByteArrayCacheable returned =
(ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
if (returned != null) {
assertArrayEquals(buf, returned.buf);
} else {
@ -223,52 +222,6 @@ public class CacheTestUtils {
ctx.stop();
}
public static void hammerEviction(final BlockCache toBeTested, int BlockSize,
int numThreads, int numQueries) throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
for (int i = 0; i < numThreads; i++) {
final int finalI = i;
final byte[] buf = new byte[5 * 1024];
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
for (int j = 0; j < 100; j++) {
BlockCacheKey key = new BlockCacheKey("key_" + finalI + "_" + j, 0);
Arrays.fill(buf, (byte) (finalI * j));
final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested
.getBlock(key, true, false, true);
if (gotBack != null) {
assertArrayEquals(gotBack.buf, bac.buf);
} else {
toBeTested.cacheBlock(key, bac);
}
}
totalQueries.incrementAndGet();
}
};
t.setDaemon(true);
ctx.addThread(t);
}
ctx.startThreads();
while (totalQueries.get() < numQueries && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
assertTrue(toBeTested.getStats().getEvictedCount() > 0);
}
public static class ByteArrayCacheable implements Cacheable {
static final CacheableDeserializer<Cacheable> blockDeserializer =
@ -405,8 +358,14 @@ public class CacheTestUtils {
destBuffer.clear();
cache.cacheBlock(key, blockToCache);
Cacheable actualBlock = cache.getBlock(key, false, false, false);
try {
actualBlock.serialize(destBuffer, true);
assertEquals(expectedBuffer, destBuffer);
cache.returnBlock(key, actualBlock);
} finally {
// Release the reference count increased by getBlock.
if (actualBlock != null) {
actualBlock.release();
}
}
}
}

View File

@ -194,7 +194,7 @@ public class TestBucketCache {
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * NUM_QUERIES);
CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
}
@Test
@ -208,6 +208,7 @@ public class TestBucketCache {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Thread.sleep(100);
}
Thread.sleep(1000);
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
@ -221,29 +222,28 @@ public class TestBucketCache {
@Test
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
new byte[10]));
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
Thread evictThread = new Thread("evict-block") {
@Override
public void run() {
cache.evictBlock(cacheKey);
}
};
evictThread.start();
cache.offsetLock.waitForWaiters(lockId, 1);
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
new byte[10]));
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
assertEquals(1L, cache.getBlockCount());
assertTrue(cache.getCurrentSize() > 0L);
assertTrue("We should have a block!", cache.iterator().hasNext());
assertEquals(0, cache.getBlockCount());
assertEquals(cache.getCurrentSize(), 0L);
}
@Test
@ -416,10 +416,10 @@ public class TestBucketCache {
@Test
public void testOffsetProducesPositiveOutput() {
//This number is picked because it produces negative output if the values isn't ensured to be positive.
//See HBASE-18757 for more information.
// This number is picked because it produces negative output if the values isn't ensured to be
// positive. See HBASE-18757 for more information.
long testValue = 549888460800L;
BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(testValue, 10, 10L, true);
BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
assertEquals(testValue, bucketEntry.offset());
}
@ -427,16 +427,15 @@ public class TestBucketCache {
public void testCacheBlockNextBlockMetadataMissing() throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf1,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf2,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
BlockCacheKey key = new BlockCacheKey("key1", 0);
BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
ByteBuffer block1Buffer = ByteBuffer.allocate(length);
ByteBuffer block2Buffer = ByteBuffer.allocate(length);
@ -448,6 +447,8 @@ public class TestBucketCache {
block1Buffer);
waitUntilFlushedToBucket(cache, key);
assertNotNull(cache.backingMap.get(key));
assertEquals(1, cache.backingMap.get(key).refCnt());
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
@ -456,9 +457,10 @@ public class TestBucketCache {
block1Buffer);
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, cache.backingMap.get(key).refCnt());
// Clear and add blockWithoutNextBlockMetadata
cache.evictBlock(key);
assertTrue(cache.evictBlock(key));
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
@ -494,8 +496,8 @@ public class TestBucketCache {
-1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, -1, -1, meta, ByteBuffAllocator.HEAP);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@ -542,7 +544,7 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
Assert.assertEquals(0, allocator.getUsedSize());
try {

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ IOTests.class, MediumTests.class })
public class TestBucketCacheRefCnt {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class);
private static final String IO_ENGINE = "offheap";
private static final long CAPACITY_SIZE = 32 * 1024 * 1024;
private static final int BLOCK_SIZE = 1024;
private static final int[] BLOCK_SIZE_ARRAY =
new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 };
private static final String PERSISTENCE_PATH = null;
private static final HFileContext CONTEXT = new HFileContextBuilder().build();
private BucketCache cache;
private static BucketCache create(int writerSize, int queueSize) throws IOException {
return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
queueSize, PERSISTENCE_PATH);
}
private static HFileBlock createBlock(int offset, int size) {
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, HEAP);
}
private static BlockCacheKey createKey(String hfileName, long offset) {
return new BlockCacheKey(hfileName, offset);
}
private void disableWriter() {
if (cache != null) {
for (WriterThread wt : cache.writerThreads) {
wt.disableWriter();
wt.interrupt();
}
}
}
@Test
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
disableWriter();
try {
for (int i = 0; i < 10; i++) {
HFileBlock blk = createBlock(i, 1020);
BlockCacheKey key = createKey("testHFile-00", i);
assertEquals(1, blk.refCnt());
cache.cacheBlock(key, blk);
assertEquals(i + 1, cache.getBlockCount());
assertEquals(2, blk.refCnt());
Cacheable block = cache.getBlock(key, false, false, false);
try {
assertEquals(3, blk.refCnt());
assertEquals(3, block.refCnt());
assertEquals(blk, block);
} finally {
block.release();
}
assertEquals(2, blk.refCnt());
assertEquals(2, block.refCnt());
}
for (int i = 0; i < 10; i++) {
BlockCacheKey key = createKey("testHFile-00", i);
Cacheable blk = cache.getBlock(key, false, false, false);
assertEquals(3, blk.refCnt());
assertFalse(blk.release());
assertEquals(2, blk.refCnt());
assertTrue(cache.evictBlock(key));
assertEquals(1, blk.refCnt());
assertTrue(blk.release());
assertEquals(0, blk.refCnt());
}
} finally {
cache.shutdown();
}
}
private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
Thread.sleep(100);
}
Thread.sleep(1000);
}
@Test
public void testBlockInBackingMap() throws Exception {
cache = create(1, 1000);
try {
HFileBlock blk = createBlock(200, 1020);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
waitUntilFlushedToCache(key);
assertEquals(1, blk.refCnt());
Cacheable block = cache.getBlock(key, false, false, false);
assertTrue(block.getMemoryType() == MemoryType.SHARED);
assertTrue(block instanceof HFileBlock);
assertEquals(2, block.refCnt());
block.retain();
assertEquals(3, block.refCnt());
Cacheable newBlock = cache.getBlock(key, false, false, false);
assertTrue(newBlock.getMemoryType() == MemoryType.SHARED);
assertTrue(newBlock instanceof HFileBlock);
assertEquals(4, newBlock.refCnt());
// release the newBlock
assertFalse(newBlock.release());
assertEquals(3, newBlock.refCnt());
assertEquals(3, block.refCnt());
// Evict the key
cache.evictBlock(key);
assertEquals(2, block.refCnt());
// Evict again, shouldn't change the refCnt.
cache.evictBlock(key);
assertEquals(2, block.refCnt());
assertFalse(block.release());
assertEquals(1, block.refCnt());
newBlock = cache.getBlock(key, false, false, false);
assertEquals(2, block.refCnt());
assertEquals(2, newBlock.refCnt());
// Release the block
assertFalse(block.release());
assertEquals(1, block.refCnt());
// Release the newBlock;
assertTrue(newBlock.release());
assertEquals(0, newBlock.refCnt());
} finally {
cache.shutdown();
}
}
@Test
public void testInBucketCache() throws IOException {
cache = create(1, 1000);
try {
HFileBlock blk = createBlock(200, 1020);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2);
Cacheable block1 = cache.getBlock(key, false, false, false);
assertTrue(block1.refCnt() >= 2);
Cacheable block2 = cache.getBlock(key, false, false, false);
assertTrue(block2.refCnt() >= 3);
cache.evictBlock(key);
assertTrue(blk.refCnt() >= 1);
assertTrue(block1.refCnt() >= 2);
assertTrue(block2.refCnt() >= 2);
// Get key again
Cacheable block3 = cache.getBlock(key, false, false, false);
if (block3 != null) {
assertTrue(block3.refCnt() >= 3);
assertFalse(block3.release());
}
blk.release();
boolean ret1 = block1.release();
boolean ret2 = block2.release();
assertTrue(ret1 || ret2);
assertEquals(0, blk.refCnt());
assertEquals(0, block1.refCnt());
assertEquals(0, block2.refCnt());
} finally {
cache.shutdown();
}
}
@Test
public void testMarkStaleAsEvicted() throws Exception {
cache = create(1, 1000);
try {
HFileBlock blk = createBlock(200, 1020);
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
cache.cacheBlock(key, blk);
waitUntilFlushedToCache(key);
assertEquals(1, blk.refCnt());
assertNotNull(cache.backingMap.get(key));
assertEquals(1, cache.backingMap.get(key).refCnt());
// RPC reference this cache.
Cacheable block1 = cache.getBlock(key, false, false, false);
assertEquals(2, block1.refCnt());
BucketEntry be1 = cache.backingMap.get(key);
assertNotNull(be1);
assertEquals(2, be1.refCnt());
// We've some RPC reference, so it won't have any effect.
assertFalse(be1.markStaleAsEvicted());
assertEquals(2, block1.refCnt());
assertEquals(2, cache.backingMap.get(key).refCnt());
// Release the RPC reference.
block1.release();
assertEquals(1, block1.refCnt());
assertEquals(1, cache.backingMap.get(key).refCnt());
// Mark the stale as evicted again, it'll do the de-allocation.
assertTrue(be1.markStaleAsEvicted());
assertEquals(0, block1.refCnt());
assertNull(cache.backingMap.get(key));
assertEquals(0, cache.size());
} finally {
cache.shutdown();
}
}
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -41,6 +42,39 @@ public class TestByteBufferIOEngine {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferIOEngine.class);
/**
* Override the {@link BucketEntry} so that we can set an arbitrary offset.
*/
private static class MockBucketEntry extends BucketEntry {
private long off;
MockBucketEntry(long offset, int length) {
super(offset & 0xFF00, length, 0, false);
this.off = offset;
}
@Override
long offset() {
return this.off;
}
}
private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer();
static {
int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER);
DESERIALIZER.setIdentifier(id);
}
static BucketEntry createBucketEntry(long offset, int len) {
BucketEntry be = new MockBucketEntry(offset, len);
be.setDeserialiserReference(DESERIALIZER);
return be;
}
static ByteBuff getByteBuff(BucketEntry be) {
return ((BufferGrabbingDeserializer) be.deserializerReference()).buf;
}
@Test
public void testByteBufferIOEngine() throws Exception {
int capacity = 32 * 1024 * 1024; // 32 MB
@ -71,9 +105,9 @@ public class TestByteBufferIOEngine {
ioEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
ByteBuff dst = deserializer.buf;
BucketEntry be = createBucketEntry(offset, blockSize);
ioEngine.read(be);
ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
@ -85,10 +119,11 @@ public class TestByteBufferIOEngine {
/**
* A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be
* deserialized. Use {@link #getDeserializedByteBuff()} to get this reference.
* deserialized.
*/
static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> {
private ByteBuff buf;
private int identifier;
@Override
public Cacheable deserialize(ByteBuff b) throws IOException {
@ -102,13 +137,13 @@ public class TestByteBufferIOEngine {
return null;
}
@Override
public int getDeserialiserIdentifier() {
return 0;
public void setIdentifier(int identifier) {
this.identifier = identifier;
}
public ByteBuff getDeserializedByteBuff() {
return this.buf;
@Override
public int getDeserialiserIdentifier() {
return identifier;
}
}
@ -151,9 +186,9 @@ public class TestByteBufferIOEngine {
ioEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
ByteBuff dst = deserializer.buf;
BucketEntry be = createBucketEntry(offset, blockSize);
ioEngine.read(be);
ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), blockSize);
Assert.assertEquals(dst.remaining(), blockSize);
Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -57,9 +59,9 @@ public class TestExclusiveMemoryMmapEngine {
src.position(pos).limit(lim);
// read
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileMmapEngine.read(offset, len, deserializer);
ByteBuff dst = deserializer.getDeserializedByteBuff();
BucketEntry be = createBucketEntry(offset, len);
fileMmapEngine.read(be);
ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@ -29,7 +31,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -110,9 +111,10 @@ public class TestFileIOEngine {
data1[j] = (byte) (Math.random() * 255);
}
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileIOEngine.read(offset, len, deserializer);
ByteBuff data2 = deserializer.getDeserializedByteBuff();
BucketEntry be = createBucketEntry(offset, len);
fileIOEngine.read(be);
ByteBuff data2 = getByteBuff(be);
assertArrayEquals(data1, data2.array());
}
}
@ -122,9 +124,9 @@ public class TestFileIOEngine {
byte[] data1 = new byte[0];
fileIOEngine.write(ByteBuffer.wrap(data1), 0);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileIOEngine.read(0, 0, deserializer);
ByteBuff data2 = deserializer.getDeserializedByteBuff();
BucketEntry be = createBucketEntry(0, 0);
fileIOEngine.read(be);
ByteBuff data2 = getByteBuff(be);
assertArrayEquals(data1, data2.array());
}
@ -140,9 +142,9 @@ public class TestFileIOEngine {
fileIOEngine.write(src, offset);
src.position(pos).limit(lim);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileIOEngine.read(offset, len, deserializer);
ByteBuff dst = deserializer.getDeserializedByteBuff();
BucketEntry be = createBucketEntry(offset, len);
fileIOEngine.read(be);
ByteBuff dst = getByteBuff(be);
Assert.assertEquals(src.remaining(), len);
Assert.assertEquals(dst.remaining(), len);