HBASE-22612 Address the final overview reviewing comments of HBASE-21879
This commit is contained in:
parent
af0e23c359
commit
6a3aa789d8
|
@ -26,8 +26,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
import sun.nio.ch.DirectBuffer;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
|
@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
import sun.nio.ch.DirectBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
|
* ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
|
||||||
* it provide high-level interfaces for upstream. when allocating desired memory size, it will
|
* it provide high-level interfaces for upstream. when allocating desired memory size, it will
|
||||||
|
|
|
@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public class ByteBufferListOutputStream extends ByteBufferOutputStream {
|
public class ByteBufferListOutputStream extends ByteBufferOutputStream {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
|
||||||
|
|
||||||
private ByteBuffAllocator allocator;
|
private final ByteBuffAllocator allocator;
|
||||||
// Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
|
// Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
|
||||||
// it is not available will make a new one our own and keep writing to that. We keep track of all
|
// it is not available will make a new one our own and keep writing to that. We keep track of all
|
||||||
// the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
|
// the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
|
||||||
|
|
|
@ -193,8 +193,8 @@ public class HFileContext implements HeapSize, Cloneable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HeapSize implementation. NOTE : The heapsize should be altered as and when new state variable
|
* HeapSize implementation. NOTE : The heap size should be altered when new state variable are
|
||||||
* are added
|
* added.
|
||||||
* @return heap size of the HFileContext
|
* @return heap size of the HFileContext
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
|
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
|
||||||
* reference count become 0, it'll call {@link Recycler#free()} once.
|
* reference count become 0, it'll call {@link Recycler#free()} exactly once.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RefCnt extends AbstractReferenceCounted {
|
public class RefCnt extends AbstractReferenceCounted {
|
||||||
|
@ -36,8 +36,8 @@ public class RefCnt extends AbstractReferenceCounted {
|
||||||
/**
|
/**
|
||||||
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
|
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
|
||||||
* zero, the recycler will do nothing. Usually, an Heap {@link ByteBuff} will use this kind of
|
* zero, the recycler will do nothing. Usually, an Heap {@link ByteBuff} will use this kind of
|
||||||
* refCnt to track its life cycle, it help to abstract the code path although it's meaningless to
|
* refCnt to track its life cycle, it help to abstract the code path although it's not really
|
||||||
* use an refCnt for heap ByteBuff.
|
* needed to track on heap ByteBuff.
|
||||||
*/
|
*/
|
||||||
public static RefCnt create() {
|
public static RefCnt create() {
|
||||||
return new RefCnt(ByteBuffAllocator.NONE);
|
return new RefCnt(ByteBuffAllocator.NONE);
|
||||||
|
|
|
@ -920,7 +920,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
}
|
}
|
||||||
if (block.getOffset() < 0) {
|
if (block.getOffset() < 0) {
|
||||||
releaseIfNotCurBlock(block);
|
releaseIfNotCurBlock(block);
|
||||||
throw new IOException("Invalid block file offset: " + block + ", path=" + reader.getPath());
|
throw new IOException(
|
||||||
|
"Invalid block file offset: " + block + ", path=" + reader.getPath());
|
||||||
}
|
}
|
||||||
// We are reading the next block without block type validation, because
|
// We are reading the next block without block type validation, because
|
||||||
// it might turn out to be a non-data block.
|
// it might turn out to be a non-data block.
|
||||||
|
@ -1131,7 +1132,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
|
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
|
||||||
if (newBlock.getOffset() < 0) {
|
if (newBlock.getOffset() < 0) {
|
||||||
releaseIfNotCurBlock(newBlock);
|
releaseIfNotCurBlock(newBlock);
|
||||||
throw new IOException("Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
|
throw new IOException(
|
||||||
|
"Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
|
||||||
}
|
}
|
||||||
updateCurrentBlock(newBlock);
|
updateCurrentBlock(newBlock);
|
||||||
}
|
}
|
||||||
|
@ -1339,7 +1341,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
// schema definition change.
|
// schema definition change.
|
||||||
LOG.info("Evicting cached block with key " + cacheKey
|
LOG.info("Evicting cached block with key " + cacheKey
|
||||||
+ " because of a data block encoding mismatch" + "; expected: "
|
+ " because of a data block encoding mismatch" + "; expected: "
|
||||||
+ expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" + path);
|
+ expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path="
|
||||||
|
+ path);
|
||||||
// This is an error scenario. so here we need to release the block.
|
// This is an error scenario. so here we need to release the block.
|
||||||
cachedBlock.release();
|
cachedBlock.release();
|
||||||
cache.evictBlock(cacheKey);
|
cache.evictBlock(cacheKey);
|
||||||
|
@ -1662,8 +1665,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
||||||
short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
|
short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
|
||||||
if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
|
if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
|
||||||
String encoderCls = dataBlockEncoder.getClass().getName();
|
String encoderCls = dataBlockEncoder.getClass().getName();
|
||||||
throw new CorruptHFileException(
|
throw new CorruptHFileException("Encoder " + encoderCls
|
||||||
"Encoder " + encoderCls + " doesn't support data block encoding "
|
+ " doesn't support data block encoding "
|
||||||
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath());
|
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath());
|
||||||
}
|
}
|
||||||
updateCurrBlockRef(newBlock);
|
updateCurrBlockRef(newBlock);
|
||||||
|
|
|
@ -153,8 +153,14 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
||||||
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
|
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
|
||||||
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
|
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
|
||||||
|
|
||||||
/** Concurrent map (the cache) */
|
/**
|
||||||
private transient final Map<BlockCacheKey, LruCachedBlock> map;
|
* Defined the cache map as {@link ConcurrentHashMap} here, because in
|
||||||
|
* {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
|
||||||
|
* (key, func). Besides, the func method must execute exactly once only when the key is present
|
||||||
|
* and under the lock context, otherwise the reference count will be messed up. Notice that the
|
||||||
|
* {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
|
||||||
|
*/
|
||||||
|
private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
|
||||||
|
|
||||||
/** Eviction lock (locked when eviction in process) */
|
/** Eviction lock (locked when eviction in process) */
|
||||||
private transient final ReentrantLock evictionLock = new ReentrantLock(true);
|
private transient final ReentrantLock evictionLock = new ReentrantLock(true);
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
transient final RAMCache ramCache;
|
transient final RAMCache ramCache;
|
||||||
// In this map, store the block's meta data like offset, length
|
// In this map, store the block's meta data like offset, length
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
|
transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flag if the cache is enabled or not... We shut it off if there are IO
|
* Flag if the cache is enabled or not... We shut it off if there are IO
|
||||||
|
@ -1524,7 +1524,16 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
* Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
* Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
||||||
*/
|
*/
|
||||||
static class RAMCache {
|
static class RAMCache {
|
||||||
final ConcurrentMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
|
/**
|
||||||
|
* Defined the map as {@link ConcurrentHashMap} explicitly here, because in
|
||||||
|
* {@link RAMCache#get(BlockCacheKey)} and
|
||||||
|
* {@link RAMCache#putIfAbsent(BlockCacheKey, RAMQueueEntry)} , we need to guarantee the
|
||||||
|
* atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
|
||||||
|
* func method can execute exactly once only when the key is present(or absent) and under the
|
||||||
|
* lock context. Otherwise, the reference count of block will be messed up. Notice that the
|
||||||
|
* {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
|
||||||
|
*/
|
||||||
|
final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public boolean containsKey(BlockCacheKey key) {
|
public boolean containsKey(BlockCacheKey key) {
|
||||||
return delegate.containsKey(key);
|
return delegate.containsKey(key);
|
||||||
|
|
|
@ -223,7 +223,7 @@ public class CacheTestUtils {
|
||||||
|
|
||||||
public static class ByteArrayCacheable implements Cacheable {
|
public static class ByteArrayCacheable implements Cacheable {
|
||||||
|
|
||||||
static final CacheableDeserializer<Cacheable> blockDeserializer =
|
private static final CacheableDeserializer<Cacheable> blockDeserializer =
|
||||||
new CacheableDeserializer<Cacheable>() {
|
new CacheableDeserializer<Cacheable>() {
|
||||||
@Override
|
@Override
|
||||||
public int getDeserializerIdentifier() {
|
public int getDeserializerIdentifier() {
|
||||||
|
@ -234,7 +234,7 @@ public class CacheTestUtils {
|
||||||
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
|
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
|
||||||
int len = b.getInt();
|
int len = b.getInt();
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
byte buf[] = new byte[len];
|
byte[] buf = new byte[len];
|
||||||
b.get(buf);
|
b.get(buf);
|
||||||
return new ByteArrayCacheable(buf);
|
return new ByteArrayCacheable(buf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||||
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
|
@ -101,11 +101,13 @@ public class TestMobWithByteBuffAllocator {
|
||||||
int rows = 0;
|
int rows = 0;
|
||||||
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
||||||
try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
|
try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
|
||||||
for (Result res; (res = scanner.next()) != null;) {
|
Result res = scanner.next();
|
||||||
|
while (res != null) {
|
||||||
rows++;
|
rows++;
|
||||||
for (Cell cell : res.listCells()) {
|
for (Cell cell : res.listCells()) {
|
||||||
Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
|
Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
|
||||||
}
|
}
|
||||||
|
res = scanner.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue