HBASE-19187 Remove option to create on heap bucket cache.
This commit is contained in:
parent
30f55f2316
commit
3ee76f8573
|
@ -31,9 +31,8 @@ public interface ByteBufferAllocator {
|
||||||
/**
|
/**
|
||||||
* Allocates a bytebuffer
|
* Allocates a bytebuffer
|
||||||
* @param size the size of the bytebuffer
|
* @param size the size of the bytebuffer
|
||||||
* @param directByteBuffer indicator to create a direct bytebuffer
|
|
||||||
* @return the bytebuffer that is created
|
* @return the bytebuffer that is created
|
||||||
* @throws IOException exception thrown if there is an error while creating the ByteBuffer
|
* @throws IOException exception thrown if there is an error while creating the ByteBuffer
|
||||||
*/
|
*/
|
||||||
ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException;
|
ByteBuffer allocate(long size) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,11 +59,10 @@ public class ByteBufferArray {
|
||||||
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
|
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
|
||||||
* we will allocate one additional buffer with capacity 0;
|
* we will allocate one additional buffer with capacity 0;
|
||||||
* @param capacity total size of the byte buffer array
|
* @param capacity total size of the byte buffer array
|
||||||
* @param directByteBuffer true if we allocate direct buffer
|
|
||||||
* @param allocator the ByteBufferAllocator that will create the buffers
|
* @param allocator the ByteBufferAllocator that will create the buffers
|
||||||
* @throws IOException throws IOException if there is an exception thrown by the allocator
|
* @throws IOException throws IOException if there is an exception thrown by the allocator
|
||||||
*/
|
*/
|
||||||
public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
|
public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.bufferSize = DEFAULT_BUFFER_SIZE;
|
this.bufferSize = DEFAULT_BUFFER_SIZE;
|
||||||
if (this.bufferSize > (capacity / 16))
|
if (this.bufferSize > (capacity / 16))
|
||||||
|
@ -71,13 +70,13 @@ public class ByteBufferArray {
|
||||||
this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
|
this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
|
||||||
LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
|
LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
|
||||||
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
|
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
|
||||||
+ bufferCount + ", direct=" + directByteBuffer);
|
+ bufferCount);
|
||||||
buffers = new ByteBuffer[bufferCount + 1];
|
buffers = new ByteBuffer[bufferCount + 1];
|
||||||
createBuffers(directByteBuffer, allocator);
|
createBuffers(allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator)
|
void createBuffers(ByteBufferAllocator allocator)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int threadCount = getThreadCount();
|
int threadCount = getThreadCount();
|
||||||
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
|
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
|
||||||
|
@ -90,7 +89,7 @@ public class ByteBufferArray {
|
||||||
// Last thread will have to deal with a different number of buffers
|
// Last thread will have to deal with a different number of buffers
|
||||||
int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
|
int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
|
||||||
futures[i] = service.submit(
|
futures[i] = service.submit(
|
||||||
new BufferCreatorCallable(bufferSize, directByteBuffer, buffersToCreate, allocator));
|
new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
|
||||||
}
|
}
|
||||||
int bufferIndex = 0;
|
int bufferIndex = 0;
|
||||||
for (Future<ByteBuffer[]> future : futures) {
|
for (Future<ByteBuffer[]> future : futures) {
|
||||||
|
@ -122,14 +121,11 @@ public class ByteBufferArray {
|
||||||
*/
|
*/
|
||||||
private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
|
private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
|
||||||
private final int bufferCapacity;
|
private final int bufferCapacity;
|
||||||
private final boolean directByteBuffer;
|
|
||||||
private final int bufferCount;
|
private final int bufferCount;
|
||||||
private final ByteBufferAllocator allocator;
|
private final ByteBufferAllocator allocator;
|
||||||
|
|
||||||
BufferCreatorCallable(int bufferCapacity, boolean directByteBuffer, int bufferCount,
|
BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
|
||||||
ByteBufferAllocator allocator) {
|
|
||||||
this.bufferCapacity = bufferCapacity;
|
this.bufferCapacity = bufferCapacity;
|
||||||
this.directByteBuffer = directByteBuffer;
|
|
||||||
this.bufferCount = bufferCount;
|
this.bufferCount = bufferCount;
|
||||||
this.allocator = allocator;
|
this.allocator = allocator;
|
||||||
}
|
}
|
||||||
|
@ -138,7 +134,7 @@ public class ByteBufferArray {
|
||||||
public ByteBuffer[] call() throws Exception {
|
public ByteBuffer[] call() throws Exception {
|
||||||
ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
|
ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
|
||||||
for (int i = 0; i < this.bufferCount; i++) {
|
for (int i = 0; i < this.bufferCount; i++) {
|
||||||
buffers[i] = allocator.allocate(this.bufferCapacity, this.directByteBuffer);
|
buffers[i] = allocator.allocate(this.bufferCapacity);
|
||||||
}
|
}
|
||||||
return buffers;
|
return buffers;
|
||||||
}
|
}
|
||||||
|
|
|
@ -894,9 +894,10 @@ possible configurations would overwhelm and obscure the important.
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.bucketcache.ioengine</name>
|
<name>hbase.bucketcache.ioengine</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
<description>Where to store the contents of the bucketcache. One of: heap,
|
<description>Where to store the contents of the bucketcache. One of: offheap,
|
||||||
offheap, or file. If a file, set it to file:PATH_TO_FILE. See
|
file, files or mmap. If a file or files, set it to file(s):PATH_TO_FILE.
|
||||||
http://hbase.apache.org/book.html#offheap.blockcache for more information.
|
mmap means the content will be in an mmaped file. Use mmap:PATH_TO_FILE.
|
||||||
|
See http://hbase.apache.org/book.html#offheap.blockcache for more information.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -38,16 +38,11 @@ public class TestByteBufferArray {
|
||||||
int capacity = 4 * 1024 * 1024;
|
int capacity = 4 * 1024 * 1024;
|
||||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocate(long size, boolean directByteBuffer)
|
public ByteBuffer allocate(long size) throws IOException {
|
||||||
throws IOException {
|
|
||||||
if (directByteBuffer) {
|
|
||||||
return ByteBuffer.allocateDirect((int) size);
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
} else {
|
|
||||||
return ByteBuffer.allocate((int) size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
|
ByteBufferArray array = new ByteBufferArray(capacity, allocator);
|
||||||
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
|
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
|
||||||
subBuf.position(capacity - 1);// Position to the last byte
|
subBuf.position(capacity - 1);// Position to the last byte
|
||||||
assertTrue(subBuf.hasRemaining());
|
assertTrue(subBuf.hasRemaining());
|
||||||
|
@ -61,15 +56,11 @@ public class TestByteBufferArray {
|
||||||
int capacity = 470 * 1021 * 1023;
|
int capacity = 470 * 1021 * 1023;
|
||||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
public ByteBuffer allocate(long size) throws IOException {
|
||||||
if (directByteBuffer) {
|
|
||||||
return ByteBuffer.allocateDirect((int) size);
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
} else {
|
|
||||||
return ByteBuffer.allocate((int) size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
|
ByteBufferArray array = new ByteBufferArray(capacity, allocator);
|
||||||
assertEquals(119, array.buffers.length);
|
assertEquals(119, array.buffers.length);
|
||||||
for (int i = 0; i < array.buffers.length; i++) {
|
for (int i = 0; i < array.buffers.length; i++) {
|
||||||
if (i == array.buffers.length - 1) {
|
if (i == array.buffers.length - 1) {
|
||||||
|
@ -84,19 +75,15 @@ public class TestByteBufferArray {
|
||||||
public void testByteBufferCreation1() throws Exception {
|
public void testByteBufferCreation1() throws Exception {
|
||||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
public ByteBuffer allocate(long size) throws IOException {
|
||||||
if (directByteBuffer) {
|
|
||||||
return ByteBuffer.allocateDirect((int) size);
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
} else {
|
|
||||||
return ByteBuffer.allocate((int) size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, false, allocator);
|
ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, allocator);
|
||||||
// overwrite
|
// overwrite
|
||||||
array.bufferCount = 25;
|
array.bufferCount = 25;
|
||||||
array.buffers = new ByteBuffer[array.bufferCount + 1];
|
array.buffers = new ByteBuffer[array.bufferCount + 1];
|
||||||
array.createBuffers(true, allocator);
|
array.createBuffers(allocator);
|
||||||
for (int i = 0; i < array.buffers.length; i++) {
|
for (int i = 0; i < array.buffers.length; i++) {
|
||||||
if (i == array.buffers.length - 1) {
|
if (i == array.buffers.length - 1) {
|
||||||
assertEquals(array.buffers[i].capacity(), 0);
|
assertEquals(array.buffers[i].capacity(), 0);
|
||||||
|
@ -108,9 +95,8 @@ public class TestByteBufferArray {
|
||||||
|
|
||||||
private static class DummyByteBufferArray extends ByteBufferArray {
|
private static class DummyByteBufferArray extends ByteBufferArray {
|
||||||
|
|
||||||
public DummyByteBufferArray(long capacity, boolean directByteBuffer,
|
public DummyByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
|
||||||
ByteBufferAllocator allocator) throws IOException {
|
super(capacity, allocator);
|
||||||
super(capacity, directByteBuffer, allocator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -381,14 +381,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
.split(FileIOEngine.FILE_DELIMITER);
|
.split(FileIOEngine.FILE_DELIMITER);
|
||||||
return new FileIOEngine(capacity, persistencePath != null, filePaths);
|
return new FileIOEngine(capacity, persistencePath != null, filePaths);
|
||||||
} else if (ioEngineName.startsWith("offheap")) {
|
} else if (ioEngineName.startsWith("offheap")) {
|
||||||
return new ByteBufferIOEngine(capacity, true);
|
return new ByteBufferIOEngine(capacity);
|
||||||
} else if (ioEngineName.startsWith("heap")) {
|
|
||||||
return new ByteBufferIOEngine(capacity, false);
|
|
||||||
} else if (ioEngineName.startsWith("mmap:")) {
|
} else if (ioEngineName.startsWith("mmap:")) {
|
||||||
return new FileMmapEngine(ioEngineName.substring(5), capacity);
|
return new FileMmapEngine(ioEngineName.substring(5), capacity);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
|
"Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,36 +68,28 @@ So said all these, when we read a block it may be possible that the bytes of tha
|
||||||
public class ByteBufferIOEngine implements IOEngine {
|
public class ByteBufferIOEngine implements IOEngine {
|
||||||
private ByteBufferArray bufferArray;
|
private ByteBufferArray bufferArray;
|
||||||
private final long capacity;
|
private final long capacity;
|
||||||
private final boolean direct;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the ByteBufferIOEngine with the given capacity
|
* Construct the ByteBufferIOEngine with the given capacity
|
||||||
* @param capacity
|
* @param capacity
|
||||||
* @param direct true if allocate direct buffer
|
|
||||||
* @throws IOException ideally here no exception to be thrown from the allocator
|
* @throws IOException ideally here no exception to be thrown from the allocator
|
||||||
*/
|
*/
|
||||||
public ByteBufferIOEngine(long capacity, boolean direct)
|
public ByteBufferIOEngine(long capacity)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.direct = direct;
|
|
||||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocate(long size, boolean directByteBuffer)
|
public ByteBuffer allocate(long size) throws IOException {
|
||||||
throws IOException {
|
|
||||||
if (directByteBuffer) {
|
|
||||||
return ByteBuffer.allocateDirect((int) size);
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
} else {
|
|
||||||
return ByteBuffer.allocate((int) size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
bufferArray = new ByteBufferArray(capacity, direct, allocator);
|
bufferArray = new ByteBufferArray(capacity, allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ioengine=" + this.getClass().getSimpleName() + ", capacity=" +
|
return "ioengine=" + this.getClass().getSimpleName() + ", capacity=" +
|
||||||
String.format("%,d", this.capacity) + ", direct=" + this.direct;
|
String.format("%,d", this.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -71,20 +71,14 @@ public class FileMmapEngine implements IOEngine {
|
||||||
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
public ByteBuffer allocate(long size) throws IOException {
|
||||||
ByteBuffer buffer = null;
|
ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE,
|
||||||
if (directByteBuffer) {
|
pos * size, size);
|
||||||
buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, pos * size,
|
|
||||||
size);
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Only Direct Bytebuffers allowed with FileMMap engine");
|
|
||||||
}
|
|
||||||
pos++;
|
pos++;
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
bufferArray = new ByteBufferArray(fileSize, true, allocator);
|
bufferArray = new ByteBufferArray(fileSize, allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long roundUp(long n, long to) {
|
private long roundUp(long n, long to) {
|
||||||
|
|
|
@ -205,29 +205,7 @@ public class MemorySizeUtil {
|
||||||
// L1 block cache is always on heap
|
// L1 block cache is always on heap
|
||||||
float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
|
float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
|
||||||
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
|
||||||
float l2CachePercent = getL2BlockCacheHeapPercent(conf);
|
return l1CachePercent;
|
||||||
return l1CachePercent + l2CachePercent;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param conf
|
|
||||||
* @return The on heap size for L2 block cache.
|
|
||||||
*/
|
|
||||||
public static float getL2BlockCacheHeapPercent(Configuration conf) {
|
|
||||||
float l2CachePercent = 0.0F;
|
|
||||||
String bucketCacheIOEngineName = conf.get(HConstants.BUCKET_CACHE_IOENGINE_KEY, null);
|
|
||||||
// L2 block cache can be on heap when IOEngine is "heap"
|
|
||||||
if (bucketCacheIOEngineName != null && bucketCacheIOEngineName.startsWith("heap")) {
|
|
||||||
float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F);
|
|
||||||
long max = -1L;
|
|
||||||
final MemoryUsage usage = safeGetHeapMemoryUsage();
|
|
||||||
if (usage != null) {
|
|
||||||
max = usage.getMax();
|
|
||||||
}
|
|
||||||
l2CachePercent = bucketCachePercentage < 1 ? bucketCachePercentage
|
|
||||||
: (bucketCachePercentage * 1024 * 1024) / max;
|
|
||||||
}
|
|
||||||
return l2CachePercent;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -260,21 +238,13 @@ public class MemorySizeUtil {
|
||||||
* @return the number of bytes to use for bucket cache, negative if disabled.
|
* @return the number of bytes to use for bucket cache, negative if disabled.
|
||||||
*/
|
*/
|
||||||
public static long getBucketCacheSize(final Configuration conf) {
|
public static long getBucketCacheSize(final Configuration conf) {
|
||||||
final float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F);
|
// Size configured in MBs
|
||||||
long bucketCacheSize;
|
float bucketCacheSize = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F);
|
||||||
// Values < 1 are treated as % of heap
|
if (bucketCacheSize < 1) {
|
||||||
if (bucketCachePercentage < 1) {
|
throw new IllegalArgumentException("Bucket Cache should be minimum 1 MB in size."
|
||||||
long max = -1L;
|
+ "Configure 'hbase.bucketcache.size' with > 1 value");
|
||||||
final MemoryUsage usage = safeGetHeapMemoryUsage();
|
|
||||||
if (usage != null) {
|
|
||||||
max = usage.getMax();
|
|
||||||
}
|
}
|
||||||
bucketCacheSize = (long)(max * bucketCachePercentage);
|
return (long) (bucketCacheSize * 1024 * 1024);
|
||||||
// values >= 1 are treated as # of MiB
|
|
||||||
} else {
|
|
||||||
bucketCacheSize = (long)(bucketCachePercentage * 1024 * 1024);
|
|
||||||
}
|
|
||||||
return bucketCacheSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,6 @@ public class HeapMemoryManager {
|
||||||
private float blockCachePercent;
|
private float blockCachePercent;
|
||||||
private float blockCachePercentMinRange;
|
private float blockCachePercentMinRange;
|
||||||
private float blockCachePercentMaxRange;
|
private float blockCachePercentMaxRange;
|
||||||
private float l2BlockCachePercent;
|
|
||||||
|
|
||||||
private float heapOccupancyPercent;
|
private float heapOccupancyPercent;
|
||||||
|
|
||||||
|
@ -183,8 +182,7 @@ public class HeapMemoryManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
|
int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
|
||||||
this.l2BlockCachePercent = MemorySizeUtil.getL2BlockCacheHeapPercent(conf);
|
int bcul = (int) ((blockCachePercentMinRange) * CONVERT_TO_PERCENTAGE);
|
||||||
int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
|
|
||||||
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
||||||
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
|
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
|
||||||
+ "the threshold required for successful cluster operation. "
|
+ "the threshold required for successful cluster operation. "
|
||||||
|
@ -195,7 +193,7 @@ public class HeapMemoryManager {
|
||||||
+ blockCachePercentMinRange);
|
+ blockCachePercentMinRange);
|
||||||
}
|
}
|
||||||
gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
|
gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
|
||||||
bcul = (int) ((blockCachePercentMaxRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
|
bcul = (int) ((blockCachePercentMaxRange) * CONVERT_TO_PERCENTAGE);
|
||||||
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
||||||
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
|
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
|
||||||
+ "the threshold required for successful cluster operation. "
|
+ "the threshold required for successful cluster operation. "
|
||||||
|
@ -361,7 +359,7 @@ public class HeapMemoryManager {
|
||||||
blockCacheSize = blockCachePercentMaxRange;
|
blockCacheSize = blockCachePercentMaxRange;
|
||||||
}
|
}
|
||||||
int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
|
int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
|
||||||
int bcul = (int) ((blockCacheSize + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE);
|
int bcul = (int) ((blockCacheSize) * CONVERT_TO_PERCENTAGE);
|
||||||
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
|
||||||
LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
|
LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
|
||||||
+ "the threshold required for successful cluster operation. "
|
+ "the threshold required for successful cluster operation. "
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
||||||
// tests
|
// tests
|
||||||
conf.setInt("hbase.regionserver.handler.count", 20);
|
conf.setInt("hbase.regionserver.handler.count", 20);
|
||||||
conf.setInt("hbase.bucketcache.size", 400);
|
conf.setInt("hbase.bucketcache.size", 400);
|
||||||
conf.setStrings("hbase.bucketcache.ioengine", "offheap");
|
conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||||
conf.setInt("hbase.hstore.compactionThreshold", 7);
|
conf.setInt("hbase.hstore.compactionThreshold", 7);
|
||||||
conf.setFloat("hfile.block.cache.size", 0.2f);
|
conf.setFloat("hfile.block.cache.size", 0.2f);
|
||||||
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class TestBlockEvictionFromClient {
|
||||||
// tests
|
// tests
|
||||||
conf.setInt("hbase.regionserver.handler.count", 20);
|
conf.setInt("hbase.regionserver.handler.count", 20);
|
||||||
conf.setInt("hbase.bucketcache.size", 400);
|
conf.setInt("hbase.bucketcache.size", 400);
|
||||||
conf.setStrings("hbase.bucketcache.ioengine", "offheap");
|
conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||||
conf.setFloat("hfile.block.cache.size", 0.2f);
|
conf.setFloat("hfile.block.cache.size", 0.2f);
|
||||||
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
|
||||||
|
|
|
@ -299,12 +299,6 @@ public class TestCacheConfig {
|
||||||
doBucketCacheConfigTest();
|
doBucketCacheConfigTest();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOnHeapBucketCacheConfig() {
|
|
||||||
this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "heap");
|
|
||||||
doBucketCacheConfigTest();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileBucketCacheConfig() throws IOException {
|
public void testFileBucketCacheConfig() throws IOException {
|
||||||
HBaseTestingUtility htu = new HBaseTestingUtility(this.conf);
|
HBaseTestingUtility htu = new HBaseTestingUtility(this.conf);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
@ -71,16 +72,12 @@ public class TestScannerFromBucketCache {
|
||||||
// Test names
|
// Test names
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
|
|
||||||
private void setUp(boolean offheap, boolean useBucketCache) throws IOException {
|
private void setUp(boolean useBucketCache) throws IOException {
|
||||||
test_util = HBaseTestingUtility.createLocalHTU();
|
test_util = HBaseTestingUtility.createLocalHTU();
|
||||||
conf = test_util.getConfiguration();
|
conf = test_util.getConfiguration();
|
||||||
if (useBucketCache) {
|
if (useBucketCache) {
|
||||||
conf.setInt("hbase.bucketcache.size", 400);
|
conf.setInt("hbase.bucketcache.size", 400);
|
||||||
if (offheap) {
|
conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||||
conf.setStrings("hbase.bucketcache.ioengine", "offheap");
|
|
||||||
} else {
|
|
||||||
conf.setStrings("hbase.bucketcache.ioengine", "heap");
|
|
||||||
}
|
|
||||||
conf.setInt("hbase.bucketcache.writer.threads", 10);
|
conf.setInt("hbase.bucketcache.writer.threads", 10);
|
||||||
conf.setFloat("hfile.block.cache.size", 0.2f);
|
conf.setFloat("hfile.block.cache.size", 0.2f);
|
||||||
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
|
||||||
|
@ -102,7 +99,7 @@ public class TestScannerFromBucketCache {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicScanWithLRUCache() throws IOException {
|
public void testBasicScanWithLRUCache() throws IOException {
|
||||||
setUp(false, false);
|
setUp(false);
|
||||||
byte[] row1 = Bytes.toBytes("row1");
|
byte[] row1 = Bytes.toBytes("row1");
|
||||||
byte[] qf1 = Bytes.toBytes("qualifier1");
|
byte[] qf1 = Bytes.toBytes("qualifier1");
|
||||||
byte[] qf2 = Bytes.toBytes("qualifier2");
|
byte[] qf2 = Bytes.toBytes("qualifier2");
|
||||||
|
@ -140,7 +137,7 @@ public class TestScannerFromBucketCache {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicScanWithOffheapBucketCache() throws IOException {
|
public void testBasicScanWithOffheapBucketCache() throws IOException {
|
||||||
setUp(true, true);
|
setUp(true);
|
||||||
byte[] row1 = Bytes.toBytes("row1offheap");
|
byte[] row1 = Bytes.toBytes("row1offheap");
|
||||||
byte[] qf1 = Bytes.toBytes("qualifier1");
|
byte[] qf1 = Bytes.toBytes("qualifier1");
|
||||||
byte[] qf2 = Bytes.toBytes("qualifier2");
|
byte[] qf2 = Bytes.toBytes("qualifier2");
|
||||||
|
@ -181,7 +178,7 @@ public class TestScannerFromBucketCache {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
|
public void testBasicScanWithOffheapBucketCacheWithMBB() throws IOException {
|
||||||
setUp(true, true);
|
setUp(true);
|
||||||
byte[] row1 = Bytes.toBytes("row1offheap");
|
byte[] row1 = Bytes.toBytes("row1offheap");
|
||||||
byte[] qf1 = Bytes.toBytes("qualifier1");
|
byte[] qf1 = Bytes.toBytes("qualifier1");
|
||||||
byte[] qf2 = Bytes.toBytes("qualifier2");
|
byte[] qf2 = Bytes.toBytes("qualifier2");
|
||||||
|
@ -231,44 +228,6 @@ public class TestScannerFromBucketCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBasicScanWithOnheapBucketCache() throws IOException {
|
|
||||||
setUp(false, true);
|
|
||||||
byte[] row1 = Bytes.toBytes("row1onheap");
|
|
||||||
byte[] qf1 = Bytes.toBytes("qualifier1");
|
|
||||||
byte[] qf2 = Bytes.toBytes("qualifier2");
|
|
||||||
byte[] fam1 = Bytes.toBytes("famonheap");
|
|
||||||
|
|
||||||
long ts1 = 1; // System.currentTimeMillis();
|
|
||||||
long ts2 = ts1 + 1;
|
|
||||||
long ts3 = ts1 + 2;
|
|
||||||
|
|
||||||
// Setting up region
|
|
||||||
String method = this.getName();
|
|
||||||
this.region = initHRegion(tableName, method, conf, test_util, fam1);
|
|
||||||
try {
|
|
||||||
List<Cell> expected = insertData(row1, qf1, qf2, fam1, ts1, ts2, ts3, false);
|
|
||||||
|
|
||||||
List<Cell> actual = performScan(row1, fam1);
|
|
||||||
// Verify result
|
|
||||||
for (int i = 0; i < expected.size(); i++) {
|
|
||||||
assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
|
|
||||||
assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
|
|
||||||
}
|
|
||||||
// do the scan again and verify. This time it should be from the bucket cache in onheap mode
|
|
||||||
actual = performScan(row1, fam1);
|
|
||||||
// Verify result
|
|
||||||
for (int i = 0; i < expected.size(); i++) {
|
|
||||||
assertFalse(actual.get(i) instanceof ByteBufferKeyValue);
|
|
||||||
assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
|
|
||||||
}
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
|
||||||
this.region = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
|
private List<Cell> insertData(byte[] row1, byte[] qf1, byte[] qf2, byte[] fam1, long ts1,
|
||||||
long ts2, long ts3, boolean withVal) throws IOException {
|
long ts2, long ts3, boolean withVal) throws IOException {
|
||||||
// Putting data in Region
|
// Putting data in Region
|
||||||
|
|
|
@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -92,7 +90,7 @@ public class TestBucketCache {
|
||||||
final long capacitySize = 32 * 1024 * 1024;
|
final long capacitySize = 32 * 1024 * 1024;
|
||||||
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
|
||||||
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
|
||||||
String ioEngineName = "heap";
|
String ioEngineName = "offheap";
|
||||||
String persistencePath = null;
|
String persistencePath = null;
|
||||||
|
|
||||||
private class MockedBucketCache extends BucketCache {
|
private class MockedBucketCache extends BucketCache {
|
||||||
|
|
|
@ -36,7 +36,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
@ -80,7 +79,7 @@ public class TestBucketWriterThread {
|
||||||
// Run with one writer thread only. Means there will be one writer queue only too. We depend
|
// Run with one writer thread only. Means there will be one writer queue only too. We depend
|
||||||
// on this in below.
|
// on this in below.
|
||||||
final int writerThreadsCount = 1;
|
final int writerThreadsCount = 1;
|
||||||
this.bc = new MockBucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
|
this.bc = new MockBucketCache("offheap", capacity, 1, new int [] {1}, writerThreadsCount,
|
||||||
capacity, null, 100/*Tolerate ioerrors for 100ms*/);
|
capacity, null, 100/*Tolerate ioerrors for 100ms*/);
|
||||||
assertEquals(writerThreadsCount, bc.writerThreads.length);
|
assertEquals(writerThreadsCount, bc.writerThreads.length);
|
||||||
assertEquals(writerThreadsCount, bc.writerQueues.size());
|
assertEquals(writerThreadsCount, bc.writerQueues.size());
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class TestByteBufferIOEngine {
|
||||||
int capacity = 32 * 1024 * 1024; // 32 MB
|
int capacity = 32 * 1024 * 1024; // 32 MB
|
||||||
int testNum = 100;
|
int testNum = 100;
|
||||||
int maxBlockSize = 64 * 1024;
|
int maxBlockSize = 64 * 1024;
|
||||||
ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false);
|
ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
|
||||||
int testOffsetAtStartNum = testNum / 10;
|
int testOffsetAtStartNum = testNum / 10;
|
||||||
int testOffsetAtEndNum = testNum / 10;
|
int testOffsetAtEndNum = testNum / 10;
|
||||||
for (int i = 0; i < testNum; i++) {
|
for (int i = 0; i < testNum; i++) {
|
||||||
|
@ -113,7 +113,7 @@ public class TestByteBufferIOEngine {
|
||||||
int capacity = 32 * 1024 * 1024; // 32 MB
|
int capacity = 32 * 1024 * 1024; // 32 MB
|
||||||
int testNum = 100;
|
int testNum = 100;
|
||||||
int maxBlockSize = 64 * 1024;
|
int maxBlockSize = 64 * 1024;
|
||||||
ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false);
|
ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
|
||||||
int testOffsetAtStartNum = testNum / 10;
|
int testOffsetAtStartNum = testNum / 10;
|
||||||
int testOffsetAtEndNum = testNum / 10;
|
int testOffsetAtEndNum = testNum / 10;
|
||||||
for (int i = 0; i < testNum; i++) {
|
for (int i = 0; i < testNum; i++) {
|
||||||
|
|
|
@ -602,58 +602,6 @@ public class TestHeapMemoryManager {
|
||||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWhenL2BlockCacheIsOnHeap() throws Exception {
|
|
||||||
HeapMemoryManager heapMemoryManager = null;
|
|
||||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
|
||||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.3));
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f);
|
|
||||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
|
|
||||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
|
||||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
|
|
||||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
|
||||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4F);
|
|
||||||
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.3F);
|
|
||||||
conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0.1F);
|
|
||||||
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "heap");
|
|
||||||
|
|
||||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
|
||||||
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
|
|
||||||
HeapMemoryTuner.class);
|
|
||||||
|
|
||||||
try {
|
|
||||||
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
|
||||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
|
||||||
fail("Should have failed as the collective heap memory need is above 80%");
|
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Change the max/min ranges for memstore and bock cache so as to pass the criteria check
|
|
||||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f);
|
|
||||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f);
|
|
||||||
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
|
||||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
|
||||||
long oldMemstoreSize = memStoreFlusher.memstoreSize;
|
|
||||||
long oldBlockCacheSize = blockCache.maxSize;
|
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
|
||||||
heapMemoryManager.start(choreService);
|
|
||||||
CustomHeapMemoryTuner.memstoreSize = 0.4f;
|
|
||||||
CustomHeapMemoryTuner.blockCacheSize = 0.4f;
|
|
||||||
// Allow the tuner to run once and do necessary memory up
|
|
||||||
Thread.sleep(1500);
|
|
||||||
// The size should not get changes as the collection of memstore size and L1 and L2 block cache
|
|
||||||
// size will cross the ax allowed 80% mark
|
|
||||||
assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize);
|
|
||||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
|
||||||
CustomHeapMemoryTuner.memstoreSize = 0.1f;
|
|
||||||
CustomHeapMemoryTuner.blockCacheSize = 0.5f;
|
|
||||||
// Allow the tuner to run once and do necessary memory up
|
|
||||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
|
||||||
assertHeapSpace(0.1f, memStoreFlusher.memstoreSize);
|
|
||||||
assertHeapSpace(0.5f, blockCache.maxSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertHeapSpace(float expectedHeapPercentage, long currentHeapSpace) {
|
private void assertHeapSpace(float expectedHeapPercentage, long currentHeapSpace) {
|
||||||
long expected = (long) (this.maxHeapSize * expectedHeapPercentage);
|
long expected = (long) (this.maxHeapSize * expectedHeapPercentage);
|
||||||
assertEquals(expected, currentHeapSpace);
|
assertEquals(expected, currentHeapSpace);
|
||||||
|
|
Loading…
Reference in New Issue