HBASE-7404 Bucket Cache:A solution about CMS,Heap Fragment and Big Cache on HBASE (Chunhui)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1432797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
zjushch 2013-01-14 04:08:17 +00:00
parent e4711dc714
commit 90ed67dc9a
26 changed files with 3348 additions and 34 deletions

View File

@ -0,0 +1,195 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.StringUtils;
/**
* This class manages an array of ByteBuffers with a default size 4MB. These
* buffers are sequential and could be considered as a large buffer.It supports
* reading/writing data from this large buffer with a position and offset
*/
@InterfaceAudience.Public
public final class ByteBufferArray {
static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private ByteBuffer buffers[];
private Lock locks[];
private int bufferSize;
private int bufferCount;
/**
* We allocate a number of byte buffers as the capacity. In order not to out
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
* we will allocate one additional buffer with capacity 0;
* @param capacity total size of the byte buffer array
* @param directByteBuffer true if we allocate direct buffer
*/
public ByteBufferArray(long capacity, boolean directByteBuffer) {
this.bufferSize = DEFAULT_BUFFER_SIZE;
if (this.bufferSize > (capacity / 16))
this.bufferSize = (int) roundUp(capacity / 16, 32768);
this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+ " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ bufferCount);
buffers = new ByteBuffer[bufferCount + 1];
locks = new Lock[bufferCount + 1];
for (int i = 0; i <= bufferCount; i++) {
locks[i] = new ReentrantLock();
if (i < bufferCount) {
buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
: ByteBuffer.allocate(bufferSize);
} else {
buffers[i] = ByteBuffer.allocate(0);
}
}
}
private long roundUp(long n, long to) {
return ((n + to - 1) / to) * to;
}
/**
* Transfers bytes from this buffer array into the given destination array
* @param start start position in the ByteBufferArray
* @param len The maximum number of bytes to be written to the given array
* @param dstArray The array into which bytes are to be written
*/
public void getMultiple(long start, int len, byte[] dstArray) {
getMultiple(start, len, dstArray, 0);
}
/**
* Transfers bytes from this buffer array into the given destination array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be written to the given array
* @param dstArray The array into which bytes are to be written
* @param dstOffset The offset within the given array of the first byte to be
* written
*/
public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
multiple(start, len, dstArray, dstOffset, new Visitor() {
public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
bb.get(array, arrayIdx, len);
}
});
}
/**
* Transfers bytes from the given source array into this buffer array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be read from the given array
* @param srcArray The array from which bytes are to be read
*/
public void putMultiple(long start, int len, byte[] srcArray) {
putMultiple(start, len, srcArray, 0);
}
/**
* Transfers bytes from the given source array into this buffer array
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be read from the given array
* @param srcArray The array from which bytes are to be read
* @param srcOffset The offset within the given array of the first byte to be
* read
*/
public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
multiple(start, len, srcArray, srcOffset, new Visitor() {
public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
bb.put(array, arrayIdx, len);
}
});
}
private interface Visitor {
/**
* Visit the given byte buffer, if it is a read action, we will transfer the
* bytes from the buffer to the destination array, else if it is a write
* action, we will transfer the bytes from the source array to the buffer
* @param bb byte buffer
* @param array a source or destination byte array
* @param arrayOffset offset of the byte array
* @param len read/write length
*/
void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
}
/**
* Access(read or write) this buffer array with a position and length as the
* given array. Here we will only lock one buffer even if it may be need visit
* several buffers. The consistency is guaranteed by the caller.
* @param start start offset of this buffer array
* @param len The maximum number of bytes to be accessed
* @param array The array from/to which bytes are to be read/written
* @param arrayOffset The offset within the given array of the first byte to
* be read or written
* @param visitor implement of how to visit the byte buffer
*/
void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
assert len >= 0;
long end = start + len;
int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
assert array.length >= len + arrayOffset;
assert startBuffer >= 0 && startBuffer < bufferCount;
assert endBuffer >= 0 && endBuffer < bufferCount
|| (endBuffer == bufferCount && endOffset == 0);
if (startBuffer >= locks.length || startBuffer < 0) {
String msg = "Failed multiple, start=" + start + ",startBuffer="
+ startBuffer + ",bufferSize=" + bufferSize;
LOG.error(msg);
throw new RuntimeException(msg);
}
int srcIndex = 0, cnt = -1;
for (int i = startBuffer; i <= endBuffer; ++i) {
Lock lock = locks[i];
lock.lock();
try {
ByteBuffer bb = buffers[i];
if (i == startBuffer) {
cnt = bufferSize - startOffset;
if (cnt > len) cnt = len;
bb.limit(startOffset + cnt).position(
startOffset );
} else if (i == endBuffer) {
cnt = endOffset;
bb.limit(cnt).position(0);
} else {
cnt = bufferSize ;
bb.limit(cnt).position(0);
}
visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
srcIndex += cnt;
} finally {
lock.unlock();
}
}
assert srcIndex == len;
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
* Cache Key for use with implementations of {@link BlockCache}
*/
@InterfaceAudience.Private
public class BlockCacheKey implements HeapSize {
public class BlockCacheKey implements HeapSize, java.io.Serializable {
private final String hfileName;
private final long offset;
private final DataBlockEncoding encoding;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@ -72,6 +74,28 @@ public class CacheConfig {
public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
"hbase.rs.evictblocksonclose";
/**
* Configuration keys for Bucket cache
*/
public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
"hbase.bucketcache.persistent.path";
public static final String BUCKET_CACHE_COMBINED_KEY =
"hbase.bucketcache.combinedcache.enabled";
public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY =
"hbase.bucketcache.percentage.in.combinedcache";
public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
"hbase.bucketcache.writer.queuelength";
/**
* Defaults for Bucket cache
*/
public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@ -341,19 +365,60 @@ public class CacheConfig {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long cacheSize = (long)(mu.getMax() * cachePercentage);
long lruCacheSize = (long) (mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize =
(long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
DirectMemoryUtils.getDirectMemorySize());
LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(cacheSize));
if (offHeapCacheSize <= 0) {
globalBlockCache = new LruBlockCache(cacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
String bucketCacheIOEngineName = conf
.get(BUCKET_CACHE_IOENGINE_KEY, null);
float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
// A percentage of max heap size or a absolute value with unit megabytes
long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
* bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
DEFAULT_BUCKET_CACHE_COMBINED);
BucketCache bucketCache = null;
if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
DEFAULT_BUCKET_CACHE_WRITER_THREADS);
int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
float combinedPercentage = conf.getFloat(
BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
if (combinedWithLru) {
lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
}
try {
int ioErrorsTolerationDuration = conf.getInt(
"hbase.bucketcache.ioengine.errors.tolerated.duration",
BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
bucketCache = new BucketCache(bucketCacheIOEngineName,
bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration);
} catch (IOException ioex) {
LOG.error("Can't instantiate bucket cache", ioex);
throw new RuntimeException(ioex);
}
}
LOG.info("Allocating LruBlockCache with maximum size "
+ StringUtils.humanReadableInt(lruCacheSize));
LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL);
lruCache.setVictimCache(bucketCache);
if (bucketCache != null && combinedWithLru) {
globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
} else {
globalBlockCache = lruCache;
}
} else {
globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
}
return globalBlockCache;

View File

@ -171,6 +171,22 @@ public class CacheStats {
windowIndex = (windowIndex + 1) % numPeriodsInWindow;
}
public long getSumHitCountsPastNPeriods() {
return sum(hitCounts);
}
public long getSumRequestCountsPastNPeriods() {
return sum(requestCounts);
}
public long getSumHitCachingCountsPastNPeriods() {
return sum(hitCachingCounts);
}
public long getSumRequestCachingCountsPastNPeriods() {
return sum(requestCachingCounts);
}
public double getHitRatioPastNPeriods() {
double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
return Double.isNaN(ratio) ? 0 : ratio;

View File

@ -56,4 +56,9 @@ public interface Cacheable extends HeapSize {
*/
public CacheableDeserializer<Cacheable> getDeserializer();
/**
* @return the block type of this cached HFile block
*/
public BlockType getBlockType();
}

View File

@ -34,4 +34,21 @@ public interface CacheableDeserializer<T extends Cacheable> {
* @return T the deserialized object.
*/
public T deserialize(ByteBuffer b) throws IOException;
/**
*
* @param b
* @param reuse true if Cacheable object can use the given buffer as its
* content
* @return T the deserialized object.
* @throws IOException
*/
public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
/**
* Get the identifier of this deserialiser. Identifier is unique for each
* deserializer and generated by {@link CacheableDeserializerIdManager}
* @return identifier number of this cacheable deserializer
*/
public int getDeserialiserIdentifier();
}

View File

@ -0,0 +1,59 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This class is used to manage the identifiers for
* {@link CacheableDeserializer}
*/
@InterfaceAudience.Private
public class CacheableDeserializerIdManager {
private static final Map<Integer, CacheableDeserializer<Cacheable>> registeredDeserializers =
new HashMap<Integer, CacheableDeserializer<Cacheable>>();
private static final AtomicInteger identifier = new AtomicInteger(0);
/**
* Register the given cacheable deserializer and generate an unique identifier
* id for it
* @param cd
* @return the identifier of given cacheable deserializer
*/
public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
int idx = identifier.incrementAndGet();
synchronized (registeredDeserializers) {
registeredDeserializers.put(idx, cd);
}
return idx;
}
/**
* Get the cacheable deserializer as the given identifier Id
* @param id
* @return CacheableDeserializer
*/
public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
return registeredDeserializers.get(id);
}
}

View File

@ -0,0 +1,215 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
/**
* CombinedBlockCache is an abstraction layer that combines
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
* to cache bloom blocks and index blocks , the larger bucketCache is used to
* cache data blocks. getBlock reads first from the smaller lruCache before
* looking for the block in the bucketCache. Metrics are the combined size and
* hits and misses of both caches.
*
**/
@InterfaceAudience.Private
public class CombinedBlockCache implements BlockCache, HeapSize {
private final LruBlockCache lruCache;
private final BucketCache bucketCache;
private final CombinedCacheStats combinedCacheStats;
public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) {
this.lruCache = lruCache;
this.bucketCache = bucketCache;
this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(),
bucketCache.getStats());
}
@Override
public long heapSize() {
return lruCache.heapSize() + bucketCache.heapSize();
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
if (isMetaBlock) {
lruCache.cacheBlock(cacheKey, buf, inMemory);
} else {
bucketCache.cacheBlock(cacheKey, buf, inMemory);
}
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
cacheBlock(cacheKey, buf, false);
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat) {
if (lruCache.containsBlock(cacheKey)) {
return lruCache.getBlock(cacheKey, caching, repeat);
}
return bucketCache.getBlock(cacheKey, caching, repeat);
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey);
}
@Override
public int evictBlocksByHfileName(String hfileName) {
return lruCache.evictBlocksByHfileName(hfileName)
+ bucketCache.evictBlocksByHfileName(hfileName);
}
@Override
public CacheStats getStats() {
return this.combinedCacheStats;
}
@Override
public void shutdown() {
lruCache.shutdown();
bucketCache.shutdown();
}
@Override
public long size() {
return lruCache.size() + bucketCache.size();
}
@Override
public long getFreeSize() {
return lruCache.getFreeSize() + bucketCache.getFreeSize();
}
@Override
public long getCurrentSize() {
return lruCache.getCurrentSize() + bucketCache.getCurrentSize();
}
@Override
public long getEvictedCount() {
return lruCache.getEvictedCount() + bucketCache.getEvictedCount();
}
@Override
public long getBlockCount() {
return lruCache.getBlockCount() + bucketCache.getBlockCount();
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
Configuration conf) throws IOException {
throw new UnsupportedOperationException();
}
private static class CombinedCacheStats extends CacheStats {
private final CacheStats lruCacheStats;
private final CacheStats bucketCacheStats;
CombinedCacheStats(CacheStats lbcStats, CacheStats fcStats) {
this.lruCacheStats = lbcStats;
this.bucketCacheStats = fcStats;
}
@Override
public long getRequestCount() {
return lruCacheStats.getRequestCount()
+ bucketCacheStats.getRequestCount();
}
@Override
public long getRequestCachingCount() {
return lruCacheStats.getRequestCachingCount()
+ bucketCacheStats.getRequestCachingCount();
}
@Override
public long getMissCount() {
return lruCacheStats.getMissCount() + bucketCacheStats.getMissCount();
}
@Override
public long getMissCachingCount() {
return lruCacheStats.getMissCachingCount()
+ bucketCacheStats.getMissCachingCount();
}
@Override
public long getHitCount() {
return lruCacheStats.getHitCount() + bucketCacheStats.getHitCount();
}
@Override
public long getHitCachingCount() {
return lruCacheStats.getHitCachingCount()
+ bucketCacheStats.getHitCachingCount();
}
@Override
public long getEvictionCount() {
return lruCacheStats.getEvictionCount()
+ bucketCacheStats.getEvictionCount();
}
@Override
public long getEvictedCount() {
return lruCacheStats.getEvictedCount()
+ bucketCacheStats.getEvictedCount();
}
@Override
public double getHitRatioPastNPeriods() {
double ratio = ((double) (lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats
.getSumHitCountsPastNPeriods()) / (double) (lruCacheStats
.getSumRequestCountsPastNPeriods() + bucketCacheStats
.getSumRequestCountsPastNPeriods()));
return Double.isNaN(ratio) ? 0 : ratio;
}
@Override
public double getHitCachingRatioPastNPeriods() {
double ratio = ((double) (lruCacheStats
.getSumHitCachingCountsPastNPeriods() + bucketCacheStats
.getSumHitCachingCountsPastNPeriods()) / (double) (lruCacheStats
.getSumRequestCachingCountsPastNPeriods() + bucketCacheStats
.getSumRequestCachingCountsPastNPeriods()));
return Double.isNaN(ratio) ? 0 : ratio;
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@ -129,8 +130,9 @@ public class HFileBlock implements Cacheable {
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
Bytes.SIZEOF_INT;
// minorVersion+offset+nextBlockOnDiskSizeWithHeader
public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+ Bytes.SIZEOF_LONG;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
@ -139,22 +141,39 @@ public class HFileBlock implements Cacheable {
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
public HFileBlock deserialize(ByteBuffer buf) throws IOException{
ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE);
buf.limit(buf.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
newByteBuffer.put(buf);
HFileBlock ourBuffer = new HFileBlock(newByteBuffer,
MINOR_VERSION_NO_CHECKSUM);
public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
ByteBuffer newByteBuffer;
if (reuse) {
newByteBuffer = buf.slice();
} else {
newByteBuffer = ByteBuffer.allocate(buf.limit());
newByteBuffer.put(buf);
}
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
int minorVersion=buf.getInt();
HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
return ourBuffer;
}
@Override
public int getDeserialiserIdentifier() {
return deserializerIdentifier;
}
@Override
public HFileBlock deserialize(ByteBuffer b) throws IOException {
return deserialize(b, false);
}
};
private static final int deserializerIdentifier;
static {
deserializerIdentifier = CacheableDeserializerIdManager
.registerDeserializer(blockDeserializer);
}
private BlockType blockType;
@ -358,6 +377,17 @@ public class HFileBlock implements Cacheable {
buf.limit() - totalChecksumBytes()).slice();
}
/**
* Returns the buffer of this block, including header data. The clients must
* not modify the buffer object. This method has to be public because it is
* used in {@link BucketCache} to avoid buffer copy.
*
* @return the byte buffer with header included for read-only operations
*/
public ByteBuffer getBufferReadOnlyWithHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
}
/**
* Returns a byte buffer of this block, including header data, positioned at
* the beginning of header. The underlying data array is not copied.
@ -1780,7 +1810,17 @@ public class HFileBlock implements Cacheable {
@Override
public void serialize(ByteBuffer destination) {
destination.put(this.buf.duplicate());
ByteBuffer dupBuf = this.buf.duplicate();
dupBuf.rewind();
destination.put(dupBuf);
destination.putInt(this.minorVersion);
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();
}
public void serializeExtraInfo(ByteBuffer destination) {
destination.putInt(this.minorVersion);
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -173,6 +175,9 @@ public class LruBlockCache implements BlockCache, HeapSize {
/** Overhead of the structure itself */
private long overhead;
/** Where to send victims (blocks evicted from the cache) */
private BucketCache victimHandler = null;
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
@ -342,6 +347,8 @@ public class LruBlockCache implements BlockCache, HeapSize {
CachedBlock cb = map.get(cacheKey);
if(cb == null) {
if (!repeat) stats.miss(caching);
if (victimHandler != null)
return victimHandler.getBlock(cacheKey, caching, repeat);
return null;
}
stats.hit(caching);
@ -349,12 +356,20 @@ public class LruBlockCache implements BlockCache, HeapSize {
return cb.getBuffer();
}
/**
* Whether the cache contains block with specified cacheKey
* @param cacheKey
* @return true if contains the block
*/
public boolean containsBlock(BlockCacheKey cacheKey) {
return map.containsKey(cacheKey);
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
CachedBlock cb = map.get(cacheKey);
if (cb == null) return false;
evictBlock(cb);
evictBlock(cb, false);
return true;
}
@ -377,14 +392,31 @@ public class LruBlockCache implements BlockCache, HeapSize {
++numEvicted;
}
}
if (victimHandler != null) {
numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
}
return numEvicted;
}
protected long evictBlock(CachedBlock block) {
/**
* Evict the block, and it will be cached by the victim handler if exists &&
* block may be read again later
* @param block
* @param evictedByEvictionProcess true if the given block is evicted by
* EvictionThread
* @return the heap size of evicted block
*/
protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey());
updateSizeMetrics(block, true);
elements.decrementAndGet();
stats.evicted();
if (evictedByEvictionProcess && victimHandler != null) {
boolean wait = getCurrentSize() < acceptableSize();
boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
inMemory, wait);
}
return block.heapSize();
}
@ -512,7 +544,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
CachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
freedBytes += evictBlock(cb);
freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
@ -703,7 +735,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
(3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
(3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
+ ClassSize.OBJECT);
@ -772,6 +804,8 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
public void shutdown() {
if (victimHandler != null)
victimHandler.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@ -822,4 +856,9 @@ public class LruBlockCache implements BlockCache, HeapSize {
return counts;
}
public void setVictimCache(BucketCache handler) {
assert victimHandler == null;
victimHandler = handler;
}
}

View File

@ -0,0 +1,549 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
/**
* This class is used to allocate a block with specified size and free the block
* when evicting. It manages an array of buckets, each bucket is associated with
* a size and caches elements up to this size. For completely empty bucket, this
* size could be re-specified dynamically.
*
* This class is not thread safe.
*/
@InterfaceAudience.Private
public final class BucketAllocator {
static final Log LOG = LogFactory.getLog(BucketAllocator.class);
final private static class Bucket {
private long baseOffset;
private int itemAllocationSize, sizeIndex;
private int itemCount;
private int freeList[];
private int freeCount, usedCount;
public Bucket(long offset) {
baseOffset = offset;
sizeIndex = -1;
}
void reconfigure(int sizeIndex) {
this.sizeIndex = sizeIndex;
assert sizeIndex < BUCKET_SIZES.length;
itemAllocationSize = BUCKET_SIZES[sizeIndex];
itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize);
freeCount = itemCount;
usedCount = 0;
freeList = new int[itemCount];
for (int i = 0; i < freeCount; ++i)
freeList[i] = i;
}
public boolean isUninstantiated() {
return sizeIndex == -1;
}
public int sizeIndex() {
return sizeIndex;
}
public int itemAllocationSize() {
return itemAllocationSize;
}
public boolean hasFreeSpace() {
return freeCount > 0;
}
public boolean isCompletelyFree() {
return usedCount == 0;
}
public int freeCount() {
return freeCount;
}
public int usedCount() {
return usedCount;
}
public int freeBytes() {
return freeCount * itemAllocationSize;
}
public int usedBytes() {
return usedCount * itemAllocationSize;
}
public long baseOffset() {
return baseOffset;
}
/**
* Allocate a block in this bucket, return the offset representing the
* position in physical space
* @return the offset in the IOEngine
*/
public long allocate() {
assert freeCount > 0; // Else should not have been called
assert sizeIndex != -1;
++usedCount;
long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
assert offset >= 0;
return offset;
}
public void addAllocation(long offset) throws BucketAllocatorException {
offset -= baseOffset;
if (offset < 0 || offset % itemAllocationSize != 0)
throw new BucketAllocatorException(
"Attempt to add allocation for bad offset: " + offset + " base="
+ baseOffset + ", bucket size=" + itemAllocationSize);
int idx = (int) (offset / itemAllocationSize);
boolean matchFound = false;
for (int i = 0; i < freeCount; ++i) {
if (matchFound) freeList[i - 1] = freeList[i];
else if (freeList[i] == idx) matchFound = true;
}
if (!matchFound)
throw new BucketAllocatorException("Couldn't find match for index "
+ idx + " in free list");
++usedCount;
--freeCount;
}
private void free(long offset) {
offset -= baseOffset;
assert offset >= 0;
assert offset < itemCount * itemAllocationSize;
assert offset % itemAllocationSize == 0;
assert usedCount > 0;
assert freeCount < itemCount; // Else duplicate free
int item = (int) (offset / (long) itemAllocationSize);
assert !freeListContains(item);
--usedCount;
freeList[freeCount++] = item;
}
private boolean freeListContains(int blockNo) {
for (int i = 0; i < freeCount; ++i) {
if (freeList[i] == blockNo) return true;
}
return false;
}
}
final class BucketSizeInfo {
// Free bucket means it has space to allocate a block;
// Completely free bucket means it has no block.
private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
private int sizeIndex;
BucketSizeInfo(int sizeIndex) {
bucketList = new ArrayList<Bucket>();
freeBuckets = new ArrayList<Bucket>();
completelyFreeBuckets = new ArrayList<Bucket>();
this.sizeIndex = sizeIndex;
}
public void instantiateBucket(Bucket b) {
assert b.isUninstantiated() || b.isCompletelyFree();
b.reconfigure(sizeIndex);
bucketList.add(b);
freeBuckets.add(b);
completelyFreeBuckets.add(b);
}
public int sizeIndex() {
return sizeIndex;
}
/**
* Find a bucket to allocate a block
* @return the offset in the IOEngine
*/
public long allocateBlock() {
Bucket b = null;
if (freeBuckets.size() > 0) // Use up an existing one first...
b = freeBuckets.get(freeBuckets.size() - 1);
if (b == null) {
b = grabGlobalCompletelyFreeBucket();
if (b != null) instantiateBucket(b);
}
if (b == null) return -1;
long result = b.allocate();
blockAllocated(b);
return result;
}
void blockAllocated(Bucket b) {
if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
if (!b.hasFreeSpace()) freeBuckets.remove(b);
}
public Bucket findAndRemoveCompletelyFreeBucket() {
Bucket b = null;
assert bucketList.size() > 0;
if (bucketList.size() == 1) {
// So we never get complete starvation of a bucket for a size
return null;
}
if (completelyFreeBuckets.size() > 0) {
b = completelyFreeBuckets.get(0);
removeBucket(b);
}
return b;
}
private void removeBucket(Bucket b) {
assert b.isCompletelyFree();
bucketList.remove(b);
freeBuckets.remove(b);
completelyFreeBuckets.remove(b);
}
public void freeBlock(Bucket b, long offset) {
assert bucketList.contains(b);
// else we shouldn't have anything to free...
assert (!completelyFreeBuckets.contains(b));
b.free(offset);
if (!freeBuckets.contains(b)) freeBuckets.add(b);
if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
}
public IndexStatistics statistics() {
long free = 0, used = 0;
for (Bucket b : bucketList) {
free += b.freeCount();
used += b.usedCount();
}
return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]);
}
}
// Default block size is 64K, so we choose more sizes near 64K, you'd better
// reset it according to your cluster's block size distribution
// TODO Make these sizes configurable
// TODO Support the view of block size distribution statistics
private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
512 * 1024 + 1024 };
/**
* Round up the given block size to bucket size, and get the corresponding
* BucketSizeInfo
* @param blockSize
* @return BucketSizeInfo
*/
public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
for (int i = 0; i < BUCKET_SIZES.length; ++i)
if (blockSize <= BUCKET_SIZES[i])
return bucketSizeInfos[i];
return null;
}
static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead
static public final int FEWEST_ITEMS_IN_BUCKET = 4;
// The capacity size for each bucket
static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE;
private Bucket[] buckets;
private BucketSizeInfo[] bucketSizeInfos;
private final long totalSize;
private long usedSize = 0;
BucketAllocator(long availableSpace) throws BucketAllocatorException {
buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)];
if (buckets.length < BUCKET_SIZES.length)
throw new BucketAllocatorException(
"Bucket allocator size too small - must have room for at least "
+ BUCKET_SIZES.length + " buckets");
bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length];
for (int i = 0; i < BUCKET_SIZES.length; ++i) {
bucketSizeInfos[i] = new BucketSizeInfo(i);
}
for (int i = 0; i < buckets.length; ++i) {
buckets[i] = new Bucket(BUCKET_CAPACITY * i);
bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1]
.instantiateBucket(buckets[i]);
}
this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY;
}
/**
* Rebuild the allocator's data structures from a persisted map.
* @param availableSpace capacity of cache
* @param map A map stores the block key and BucketEntry(block's meta data
* like offset, length)
* @param realCacheSize cached data size statistics for bucket cache
* @throws BucketAllocatorException
*/
BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map,
AtomicLong realCacheSize) throws BucketAllocatorException {
this(availableSpace);
// each bucket has an offset, sizeindex. probably the buckets are too big
// in our default state. so what we do is reconfigure them according to what
// we've found. we can only reconfigure each bucket once; if more than once,
// we know there's a bug, so we just log the info, throw, and start again...
boolean[] reconfigured = new boolean[buckets.length];
for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
long foundOffset = entry.getValue().offset();
int foundLen = entry.getValue().getLength();
int bucketSizeIndex = -1;
for (int i = 0; i < BUCKET_SIZES.length; ++i) {
if (foundLen <= BUCKET_SIZES[i]) {
bucketSizeIndex = i;
break;
}
}
if (bucketSizeIndex == -1) {
throw new BucketAllocatorException(
"Can't match bucket size for the block with size " + foundLen);
}
int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY);
if (bucketNo < 0 || bucketNo >= buckets.length)
throw new BucketAllocatorException("Can't find bucket " + bucketNo
+ ", total buckets=" + buckets.length
+ "; did you shrink the cache?");
Bucket b = buckets[bucketNo];
if (reconfigured[bucketNo] == true) {
if (b.sizeIndex() != bucketSizeIndex)
throw new BucketAllocatorException(
"Inconsistent allocation in bucket map;");
} else {
if (!b.isCompletelyFree())
throw new BucketAllocatorException("Reconfiguring bucket "
+ bucketNo + " but it's already allocated; corrupt data");
// Need to remove the bucket from whichever list it's currently in at
// the moment...
BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
oldbsi.removeBucket(b);
bsi.instantiateBucket(b);
reconfigured[bucketNo] = true;
}
realCacheSize.addAndGet(foundLen);
buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].itemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
}
}
public String getInfo() {
StringBuilder sb = new StringBuilder(1024);
for (int i = 0; i < buckets.length; ++i) {
Bucket b = buckets[i];
sb.append(" Bucket ").append(i).append(": ").append(b.itemAllocationSize());
sb.append(" freeCount=").append(b.freeCount()).append(" used=")
.append(b.usedCount());
sb.append('\n');
}
return sb.toString();
}
public long getUsedSize() {
return this.usedSize;
}
public long getFreeSize() {
long freeSize = this.totalSize - getUsedSize();
return freeSize;
}
public long getTotalSize() {
return this.totalSize;
}
/**
* Allocate a block with specified size. Return the offset
* @param blockSize size of block
* @throws BucketAllocatorException,CacheFullException
* @return the offset in the IOEngine
*/
public synchronized long allocateBlock(int blockSize) throws CacheFullException,
BucketAllocatorException {
assert blockSize > 0;
BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
if (bsi == null) {
throw new BucketAllocatorException("Allocation too big size=" + blockSize);
}
long offset = bsi.allocateBlock();
// Ask caller to free up space and try again!
if (offset < 0)
throw new CacheFullException(blockSize, bsi.sizeIndex());
usedSize += BUCKET_SIZES[bsi.sizeIndex()];
return offset;
}
private Bucket grabGlobalCompletelyFreeBucket() {
for (BucketSizeInfo bsi : bucketSizeInfos) {
Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
if (b != null) return b;
}
return null;
}
/**
* Free a block with the offset
* @param offset block's offset
* @return size freed
*/
public synchronized int freeBlock(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
usedSize -= targetBucket.itemAllocationSize();
return targetBucket.itemAllocationSize();
}
public int sizeIndexOfAllocation(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
return targetBucket.sizeIndex();
}
public int sizeOfAllocation(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
return targetBucket.itemAllocationSize();
}
static public int getMaximumAllocationIndex() {
return BUCKET_SIZES.length;
}
static class IndexStatistics {
private long freeCount, usedCount, itemSize, totalCount;
public long freeCount() {
return freeCount;
}
public long usedCount() {
return usedCount;
}
public long totalCount() {
return totalCount;
}
public long freeBytes() {
return freeCount * itemSize;
}
public long usedBytes() {
return usedCount * itemSize;
}
public long totalBytes() {
return totalCount * itemSize;
}
public long itemSize() {
return itemSize;
}
public IndexStatistics(long free, long used, long itemSize) {
setTo(free, used, itemSize);
}
public IndexStatistics() {
setTo(-1, -1, 0);
}
public void setTo(long free, long used, long itemSize) {
this.itemSize = itemSize;
this.freeCount = free;
this.usedCount = used;
this.totalCount = free + used;
}
}
public void dumpToLog() {
logStatistics();
StringBuilder sb = new StringBuilder();
for (Bucket b : buckets) {
sb.append("Bucket:").append(b.baseOffset).append('\n');
sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
+ "; used:" + b.usedCount + "; freelist\n");
for (int i = 0; i < b.freeCount(); ++i)
sb.append(b.freeList[i]).append(',');
sb.append('\n');
}
LOG.info(sb);
}
public void logStatistics() {
IndexStatistics total = new IndexStatistics();
IndexStatistics[] stats = getIndexStatistics(total);
LOG.info("Bucket allocator statistics follow:\n");
LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes="
+ total.usedBytes() + "; total bytes=" + total.totalBytes());
for (IndexStatistics s : stats) {
LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount()
+ "; free=" + s.freeCount() + "; total=" + s.totalCount());
}
}
public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
IndexStatistics[] stats = getIndexStatistics();
long totalfree = 0, totalused = 0;
for (IndexStatistics stat : stats) {
totalfree += stat.freeBytes();
totalused += stat.usedBytes();
}
grandTotal.setTo(totalfree, totalused, 1);
return stats;
}
public IndexStatistics[] getIndexStatistics() {
IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length];
for (int i = 0; i < stats.length; ++i)
stats[i] = bucketSizeInfos[i].statistics();
return stats;
}
public long freeBlock(long freeList[]) {
long sz = 0;
for (int i = 0; i < freeList.length; ++i)
sz += freeBlock(freeList[i]);
return sz;
}
}

View File

@ -0,0 +1,35 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Thrown by {@link BucketAllocator}
*/
@InterfaceAudience.Private
public class BucketAllocatorException extends IOException {
private static final long serialVersionUID = 2479119906660788096L;
BucketAllocatorException(String reason) {
super(reason);
}
}

View File

@ -0,0 +1,59 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Class that implements cache metrics for bucket cache.
*/
@InterfaceAudience.Private
public class BucketCacheStats extends CacheStats {
private final AtomicLong ioHitCount = new AtomicLong(0);
private final AtomicLong ioHitTime = new AtomicLong(0);
private final static int nanoTime = 1000000;
private long lastLogTime = EnvironmentEdgeManager.currentTimeMillis();
public void ioHit(long time) {
ioHitCount.incrementAndGet();
ioHitTime.addAndGet(time);
}
public long getIOHitsPerSecond() {
long now = EnvironmentEdgeManager.currentTimeMillis();
long took = (now - lastLogTime) / 1000;
lastLogTime = now;
return ioHitCount.get() / took;
}
public double getIOTimePerHit() {
long time = ioHitTime.get() / nanoTime;
long count = ioHitCount.get();
return ((float) time / (float) count);
}
public void reset() {
ioHitCount.set(0);
ioHitTime.set(0);
}
}

View File

@ -0,0 +1,100 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
* IO engine that stores data on the memory using an array of ByteBuffers
* {@link ByteBufferArray}
*/
@InterfaceAudience.Private
public class ByteBufferIOEngine implements IOEngine {
private ByteBufferArray bufferArray;
/**
* Construct the ByteBufferIOEngine with the given capacity
* @param capacity
* @param direct true if allocate direct buffer
* @throws IOException
*/
public ByteBufferIOEngine(long capacity, boolean direct)
throws IOException {
bufferArray = new ByteBufferArray(capacity, direct);
}
/**
* Memory IO engine is always unable to support persistent storage for the
* cache
* @return false
*/
@Override
public boolean isPersistent() {
return false;
}
/**
* Transfers data from the buffer array to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* @param offset The offset in the ByteBufferArray of the first byte to be
* read
* @throws IOException
*/
@Override
public void read(ByteBuffer dstBuffer, long offset) throws IOException {
assert dstBuffer.hasArray();
bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
dstBuffer.arrayOffset());
}
/**
* Transfers data from the given byte buffer to the buffer array
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the ByteBufferArray of the first byte to be
* written
* @throws IOException
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
}
/**
* No operation for the sync in the memory IO engine
*/
@Override
public void sync() {
}
/**
* No operation for the shutdown in the memory IO engine
*/
@Override
public void shutdown() {
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Thrown by {@link BucketAllocator#allocateBlock(int)} when cache is full for
* the requested size
*/
@InterfaceAudience.Private
public class CacheFullException extends IOException {
private static final long serialVersionUID = 3265127301824638920L;
private int requestedSize, bucketIndex;
CacheFullException(int requestedSize, int bucketIndex) {
super();
this.requestedSize = requestedSize;
this.bucketIndex = bucketIndex;
}
public int bucketIndex() {
return bucketIndex;
}
public int requestedSize() {
return requestedSize;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(1024);
sb.append("Allocator requested size ").append(requestedSize);
sb.append(" for bucket ").append(bucketIndex);
return sb.toString();
}
}

View File

@ -0,0 +1,122 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.util.Comparator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
import com.google.common.collect.MinMaxPriorityQueue;
/**
* A memory-bound queue that will grow until an element brings total size larger
* than maxSize. From then on, only entries that are sorted larger than the
* smallest current entry will be inserted/replaced.
*
* <p>
* Use this when you want to find the largest elements (according to their
* ordering, not their heap size) that consume as close to the specified maxSize
* as possible. Default behavior is to grow just above rather than just below
* specified max.
*/
@InterfaceAudience.Private
public class CachedEntryQueue {
private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
private long cacheSize;
private long maxSize;
/**
* @param maxSize the target size of elements in the queue
* @param blockSize expected average size of blocks
*/
public CachedEntryQueue(long maxSize, long blockSize) {
int initialSize = (int) (maxSize / blockSize);
if (initialSize == 0)
initialSize++;
queue = MinMaxPriorityQueue
.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
Entry<BlockCacheKey, BucketEntry> entry2) {
return entry1.getValue().compareTo(entry2.getValue());
}
}).expectedSize(initialSize).create();
cacheSize = 0;
this.maxSize = maxSize;
}
/**
* Attempt to add the specified entry to this queue.
*
* <p>
* If the queue is smaller than the max size, or if the specified element is
* ordered after the smallest element in the queue, the element will be added
* to the queue. Otherwise, there is no side effect of this call.
* @param entry a bucket entry with key to try to add to the queue
*/
public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) {
if (cacheSize < maxSize) {
queue.add(entry);
cacheSize += entry.getValue().getLength();
} else {
BucketEntry head = queue.peek().getValue();
if (entry.getValue().compareTo(head) > 0) {
cacheSize += entry.getValue().getLength();
cacheSize -= head.getLength();
if (cacheSize > maxSize) {
queue.poll();
} else {
cacheSize += head.getLength();
}
queue.add(entry);
}
}
}
/**
* @return The next element in this queue, or {@code null} if the queue is
* empty.
*/
public Map.Entry<BlockCacheKey, BucketEntry> poll() {
return queue.poll();
}
/**
* @return The last element in this queue, or {@code null} if the queue is
* empty.
*/
public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
return queue.pollLast();
}
/**
* Total size of all elements in this queue.
* @return size of all elements currently in queue, in bytes
*/
public long cacheSize() {
return cacheSize;
}
}

View File

@ -0,0 +1,110 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.StringUtils;
/**
* IO engine that stores data to a file on the local file system.
*/
@InterfaceAudience.Private
public class FileIOEngine implements IOEngine {
static final Log LOG = LogFactory.getLog(FileIOEngine.class);
private FileChannel fileChannel = null;
public FileIOEngine(String filePath, long fileSize) throws IOException {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(filePath, "rw");
raf.setLength(fileSize);
fileChannel = raf.getChannel();
LOG.info("Allocating " + StringUtils.byteDesc(fileSize)
+ ", on the path:" + filePath);
} catch (java.io.FileNotFoundException fex) {
LOG.error("Can't create bucket cache file " + filePath, fex);
throw fex;
} catch (IOException ioex) {
LOG.error("Can't extend bucket cache file; insufficient space for "
+ StringUtils.byteDesc(fileSize), ioex);
if (raf != null) raf.close();
throw ioex;
}
}
/**
* File IO engine is always able to support persistent storage for the cache
* @return true
*/
@Override
public boolean isPersistent() {
return true;
}
/**
* Transfers data from file to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* @param offset The offset in the file where the first byte to be read
* @throws IOException
*/
@Override
public void read(ByteBuffer dstBuffer, long offset) throws IOException {
fileChannel.read(dstBuffer, offset);
}
/**
* Transfers data from the given byte buffer to file
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the file where the first byte to be written
* @throws IOException
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
fileChannel.write(srcBuffer, offset);
}
/**
* Sync the data to file after writing
* @throws IOException
*/
@Override
public void sync() throws IOException {
fileChannel.force(true);
}
/**
* Close the file
*/
@Override
public void shutdown() {
try {
fileChannel.close();
} catch (IOException ex) {
LOG.error("Can't shutdown cleanly", ex);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A class implementing IOEngine interface could support data services for
* {@link BucketCache}.
*/
@InterfaceAudience.Private
public interface IOEngine {
/**
* @return true if persistent storage is supported for the cache when shutdown
*/
boolean isPersistent();
/**
* Transfers data from IOEngine to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* @param offset The offset in the IO engine where the first byte to be read
* @throws IOException
*/
void read(ByteBuffer dstBuffer, long offset) throws IOException;
/**
* Transfers data from the given byte buffer to IOEngine
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the IO engine where the first byte to be
* written
* @throws IOException
*/
void write(ByteBuffer srcBuffer, long offset) throws IOException;
/**
* Sync the data to IOEngine after writing
* @throws IOException
*/
void sync() throws IOException;
/**
* Shutdown the IOEngine
*/
void shutdown();
}

View File

@ -0,0 +1,56 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Map from type T to int and vice-versa. Used for reducing bit field item
* counts.
*/
@InterfaceAudience.Public
public final class UniqueIndexMap<T> implements Serializable {
private static final long serialVersionUID = -1145635738654002342L;
ConcurrentHashMap<T, Integer> mForwardMap = new ConcurrentHashMap<T, Integer>();
ConcurrentHashMap<Integer, T> mReverseMap = new ConcurrentHashMap<Integer, T>();
AtomicInteger mIndex = new AtomicInteger(0);
// Map a length to an index. If we can't, allocate a new mapping. We might
// race here and get two entries with the same deserialiser. This is fine.
int map(T parameter) {
Integer ret = mForwardMap.get(parameter);
if (ret != null) return ret.intValue();
int nexti = mIndex.incrementAndGet();
assert (nexti < Short.MAX_VALUE);
mForwardMap.put(parameter, nexti);
mReverseMap.put(nexti, parameter);
return nexti;
}
T unmap(int leni) {
Integer len = Integer.valueOf(leni);
assert mReverseMap.containsKey(len);
return mReverseMap.get(len);
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.ChecksumType;
public class CacheTestUtils {
@ -149,7 +150,11 @@ public class CacheTestUtils {
try {
if (toBeTested.getBlock(block.blockName, true, false) != null) {
toBeTested.cacheBlock(block.blockName, block.block);
fail("Cache should not allow re-caching a block");
if (!(toBeTested instanceof BucketCache)) {
// BucketCache won't throw exception when caching already cached
// block
fail("Cache should not allow re-caching a block");
}
}
} catch (RuntimeException re) {
// expected
@ -242,6 +247,30 @@ public class CacheTestUtils {
private static class ByteArrayCacheable implements Cacheable {
static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
@Override
public Cacheable deserialize(ByteBuffer b) throws IOException {
int len = b.getInt();
Thread.yield();
byte buf[] = new byte[len];
b.get(buf);
return new ByteArrayCacheable(buf);
}
@Override
public int getDeserialiserIdentifier() {
return deserializerIdentifier;
}
@Override
public Cacheable deserialize(ByteBuffer b, boolean reuse)
throws IOException {
return deserialize(b);
}
};
final byte[] buf;
public ByteArrayCacheable(byte[] buf) {
@ -268,20 +297,22 @@ public class CacheTestUtils {
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
return new CacheableDeserializer<Cacheable>() {
return blockDeserializer;
}
@Override
public Cacheable deserialize(ByteBuffer b) throws IOException {
int len = b.getInt();
Thread.yield();
byte buf[] = new byte[len];
b.get(buf);
return new ByteArrayCacheable(buf);
}
};
private static final int deserializerIdentifier;
static {
deserializerIdentifier = CacheableDeserializerIdManager
.registerDeserializer(blockDeserializer);
}
@Override
public BlockType getBlockType() {
return BlockType.DATA;
}
}
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
int numBlocks) {
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];

View File

@ -135,6 +135,11 @@ public class TestCachedBlockQueue extends TestCase {
return null;
}
@Override
public BlockType getBlockType() {
return BlockType.DATA;
}
}, accessTime, false);
}
}

View File

@ -663,6 +663,11 @@ public class TestLruBlockCache {
@Override
public void serialize(ByteBuffer destination) {
}
@Override
public BlockType getBlockType() {
return BlockType.DATA;
}
}

View File

@ -0,0 +1,154 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Basic test of BucketCache.Puts and gets.
* <p>
* Tests will ensure that blocks' data correctness under several threads
* concurrency
*/
@Category(SmallTests.class)
public class TestBucketCache {
static final Log LOG = LogFactory.getLog(TestBucketCache.class);
BucketCache cache;
final int CACHE_SIZE = 1000000;
final int NUM_BLOCKS = 100;
final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
final int NUM_THREADS = 1000;
final int NUM_QUERIES = 10000;
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
String ioEngineName = "heap";
String persistencePath = null;
private class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity,
int writerThreads,
int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
super(ioEngineName, capacity, writerThreads, writerQLen, persistencePath);
super.wait_when_cache = true;
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf,
boolean inMemory) {
if (super.getBlock(cacheKey, true, false) != null) {
throw new RuntimeException("Cached an already cached block");
}
super.cacheBlock(cacheKey, buf, inMemory);
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (super.getBlock(cacheKey, true, false) != null) {
throw new RuntimeException("Cached an already cached block");
}
super.cacheBlock(cacheKey, buf);
}
}
@Before
public void setup() throws FileNotFoundException, IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, writeThreads,
writerQLen, persistencePath);
}
@After
public void tearDown() {
cache.shutdown();
}
@Test
public void testBucketAllocator() throws BucketAllocatorException {
BucketAllocator mAllocator = cache.getAllocator();
/*
* Test the allocator first
*/
int[] blockSizes = new int[2];
blockSizes[0] = 4 * 1024;
blockSizes[1] = 8 * 1024;
boolean full = false;
int i = 0;
ArrayList<Long> allocations = new ArrayList<Long>();
// Fill the allocated extents
while (!full) {
try {
allocations.add(new Long(mAllocator.allocateBlock(blockSizes[i
% blockSizes.length])));
++i;
} catch (CacheFullException cfe) {
full = true;
}
}
for (i = 0; i < blockSizes.length; i++) {
BucketSizeInfo bucketSizeInfo = mAllocator
.roundUpToBucketSizeInfo(blockSizes[0]);
IndexStatistics indexStatistics = bucketSizeInfo.statistics();
assertTrue(indexStatistics.freeCount() == 0);
}
for (long offset : allocations) {
assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator
.freeBlock(offset));
}
assertTrue(mAllocator.getUsedSize() == 0);
}
@Test
public void testCacheSimple() throws Exception {
CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
}
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
@Test
public void testHeapSizeChanges() throws Exception {
cache.stopWriterThreads();
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}
}

View File

@ -0,0 +1,72 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Basic test for {@link ByteBufferIOEngine}
*/
@Category(SmallTests.class)
public class TestByteBufferIOEngine {
@Test
public void testByteBufferIOEngine() throws Exception {
int capacity = 32 * 1024 * 1024; // 32 MB
int testNum = 100;
int maxBlockSize = 64 * 1024;
ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false);
int testOffsetAtStartNum = testNum / 10;
int testOffsetAtEndNum = testNum / 10;
for (int i = 0; i < testNum; i++) {
byte val = (byte) (Math.random() * 255);
int blockSize = (int) (Math.random() * maxBlockSize);
byte[] byteArray = new byte[blockSize];
for (int j = 0; j < byteArray.length; ++j) {
byteArray[j] = val;
}
ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
int offset = 0;
if (testOffsetAtStartNum > 0) {
testOffsetAtStartNum--;
offset = 0;
} else if (testOffsetAtEndNum > 0) {
testOffsetAtEndNum--;
offset = capacity - blockSize;
} else {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
ioEngine.write(srcBuffer, offset);
ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize);
ioEngine.read(dstBuffer, offset);
byte[] byteArray2 = dstBuffer.array();
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(byteArray[j] == byteArray2[j]);
}
}
assert testOffsetAtStartNum == 0;
assert testOffsetAtEndNum == 0;
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Basic test for {@link FileIOEngine}
*/
@Category(SmallTests.class)
public class TestFileIOEngine {
@Test
public void testFileIOEngine() throws IOException {
int size = 2 * 1024 * 1024; // 2 MB
String filePath = "testFileIOEngine";
try {
FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
for (int i = 0; i < 50; i++) {
int len = (int) Math.floor(Math.random() * 100);
long offset = (long) Math.floor(Math.random() * size % (size - len));
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
}
byte[] data2 = new byte[len];
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
fileIOEngine.read(ByteBuffer.wrap(data2), offset);
for (int j = 0; j < data1.length; ++j) {
assertTrue(data1[j] == data2[j]);
}
}
} finally {
File file = new File(filePath);
if (file.exists()) {
file.delete();
}
}
}
}