HBASE-4027 Enable direct byte buffers LruBlockCache

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1162207 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-08-26 18:53:00 +00:00
parent 5f0953e69e
commit 2bcc605a3f
26 changed files with 1936 additions and 155 deletions

View File

@ -463,6 +463,7 @@ Release 0.91.0 - Unreleased
HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last
written version (Lars Hofhansl) written version (Lars Hofhansl)
HBASE-4242 Add documentation for HBASE-4071 (Lars Hofhansl) HBASE-4242 Add documentation for HBASE-4071 (Lars Hofhansl)
HBASE-4027 Enable direct byte buffers LruBlockCache (Li Pi)
Release 0.90.5 - Unreleased Release 0.90.5 - Unreleased

View File

@ -39,6 +39,11 @@ export HBASE_OPTS="-ea -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode"
# Uncomment below to enable java garbage collection logging in the .out file. # Uncomment below to enable java garbage collection logging in the .out file.
# export HBASE_OPTS="$HBASE_OPTS -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps" # export HBASE_OPTS="$HBASE_OPTS -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
# Uncomment below if you intend to use the EXPERIMENTAL off heap cache.
# export HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize="
# Set hbase.offheapcachesize in hbase-site.xml
# Uncomment and adjust to enable JMX exporting # Uncomment and adjust to enable JMX exporting
# See jmxremote.password and jmxremote.access in $JRE_HOME/lib/management to configure remote password access. # See jmxremote.password and jmxremote.access in $JRE_HOME/lib/management to configure remote password access.
# More details at: http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html # More details at: http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html

View File

@ -23,14 +23,10 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/** /**
* Block cache interface. Anything that implements the {@link HeapSize} * Block cache interface. Anything that implements the {@link Cacheable}
* interface can be put in the cache, because item size is all the cache * interface can be put in the cache.
* cares about. We might move to a more specialized "cacheable" interface
* in the future.
* *
* TODO: Add filename or hash of filename to block cache key. * TODO: Add filename or hash of filename to block cache key.
*/ */
@ -41,22 +37,22 @@ public interface BlockCache {
* @param buf The block contents wrapped in a ByteBuffer. * @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory * @param inMemory Whether block should be treated as in-memory
*/ */
public void cacheBlock(String blockName, HeapSize buf, boolean inMemory); public void cacheBlock(String blockName, Cacheable buf, boolean inMemory);
/** /**
* Add block to cache (defaults to not in-memory). * Add block to cache (defaults to not in-memory).
* @param blockName Zero-based file block number. * @param blockName Zero-based file block number.
* @param buf The block contents wrapped in a ByteBuffer. * @param buf The object to cache.
*/ */
public void cacheBlock(String blockName, HeapSize buf); public void cacheBlock(String blockName, Cacheable buf);
/** /**
* Fetch block from cache. * Fetch block from cache.
* @param blockName Block number to fetch. * @param blockName Block number to fetch.
* @param caching Whether this request has caching enabled (used for stats) * @param caching Whether this request has caching enabled (used for stats)
* @return Block or null if block is not in the cache. * @return Block or null if block is not in 2 cache.
*/ */
public HeapSize getBlock(String blockName, boolean caching); public Cacheable getBlock(String blockName, boolean caching);
/** /**
* Evict block from cache. * Evict block from cache.
@ -94,15 +90,15 @@ public interface BlockCache {
public long getCurrentSize(); public long getCurrentSize();
public long getEvictedCount(); public long getEvictedCount();
/** /**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects. * Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system * This method could be fairly heavyweight in that it evaluates the entire HBase file-system
* against what is in the RegionServer BlockCache. * against what is in the RegionServer BlockCache.
* <br><br> * <br><br>
* The contract of this interface is to return the List in sorted order by Table name, then * The contract of this interface is to return the List in sorted order by Table name, then
* ColumnFamily. * ColumnFamily.
* *
* @param conf HBaseConfiguration * @param conf HBaseConfiguration
* @return List of BlockCacheColumnFamilySummary * @return List of BlockCacheColumnFamilySummary
* @throws IOException exception * @throws IOException exception

View File

@ -0,0 +1,118 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.util.concurrent.atomic.AtomicLong;
/**
* Class that implements cache metrics.
*/
public class CacheStats {
/** The number of getBlock requests that were cache hits */
private final AtomicLong hitCount = new AtomicLong(0);
/**
* The number of getBlock requests that were cache hits, but only from
* requests that were set to use the block cache. This is because all reads
* attempt to read from the block cache even if they will not put new blocks
* into the block cache. See HBASE-2253 for more information.
*/
private final AtomicLong hitCachingCount = new AtomicLong(0);
/** The number of getBlock requests that were cache misses */
private final AtomicLong missCount = new AtomicLong(0);
/**
* The number of getBlock requests that were cache misses, but only from
* requests that were set to use the block cache.
*/
private final AtomicLong missCachingCount = new AtomicLong(0);
/** The number of times an eviction has occurred */
private final AtomicLong evictionCount = new AtomicLong(0);
/** The total number of blocks that have been evicted */
private final AtomicLong evictedBlockCount = new AtomicLong(0);
public void miss(boolean caching) {
missCount.incrementAndGet();
if (caching) missCachingCount.incrementAndGet();
}
public void hit(boolean caching) {
hitCount.incrementAndGet();
if (caching) hitCachingCount.incrementAndGet();
}
public void evict() {
evictionCount.incrementAndGet();
}
public void evicted() {
evictedBlockCount.incrementAndGet();
}
public long getRequestCount() {
return getHitCount() + getMissCount();
}
public long getRequestCachingCount() {
return getHitCachingCount() + getMissCachingCount();
}
public long getMissCount() {
return missCount.get();
}
public long getMissCachingCount() {
return missCachingCount.get();
}
public long getHitCount() {
return hitCount.get();
}
public long getHitCachingCount() {
return hitCachingCount.get();
}
public long getEvictionCount() {
return evictionCount.get();
}
public long getEvictedCount() {
return evictedBlockCount.get();
}
public double getHitRatio() {
return ((float)getHitCount()/(float)getRequestCount());
}
public double getHitCachingRatio() {
return ((float)getHitCachingCount()/(float)getRequestCachingCount());
}
public double getMissRatio() {
return ((float)getMissCount()/(float)getRequestCount());
}
public double getMissCachingRatio() {
return ((float)getMissCachingCount()/(float)getRequestCachingCount());
}
public double evictedPerEviction() {
return ((float)getEvictedCount()/(float)getEvictionCount());
}
}

View File

@ -0,0 +1,56 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.HeapSize;
/**
* Cacheable is an interface that allows for an object to be cached. If using an
* on heap cache, just use heapsize. If using an off heap cache, Cacheable
* provides methods for serialization of the object.
*
* Some objects cannot be moved off heap, those objects will return a
* getSerializedLength() of 0.
*
*/
public interface Cacheable extends HeapSize {
/**
* Returns the length of the ByteBuffer required to serialized the object. If the
* object cannot be serialized, it should also return 0.
*
* @return int length in bytes of the serialized form.
*/
public int getSerializedLength();
/**
* Serializes its data into destination.
*/
public void serialize(ByteBuffer destination);
/**
* Returns CacheableDeserializer instance which reconstructs original object from ByteBuffer.
*
* @return CacheableDeserialzer instance.
*/
public CacheableDeserializer<Cacheable> getDeserializer();
}

View File

@ -0,0 +1,17 @@
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Interface for a deserializer. Throws an IOException if the serialized data is
* incomplete or wrong.
* */
public interface CacheableDeserializer<T extends Cacheable> {
/**
* Returns the deserialized object.
*
* @return T the deserialized object.
*/
public T deserialize(ByteBuffer b) throws IOException;
}

View File

@ -19,8 +19,6 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -55,16 +53,16 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
}; };
private final String blockName; private final String blockName;
private final HeapSize buf; private final Cacheable buf;
private volatile long accessTime; private volatile long accessTime;
private long size; private long size;
private BlockPriority priority; private BlockPriority priority;
public CachedBlock(String blockName, HeapSize buf, long accessTime) { public CachedBlock(String blockName, Cacheable buf, long accessTime) {
this(blockName, buf, accessTime, false); this(blockName, buf, accessTime, false);
} }
public CachedBlock(String blockName, HeapSize buf, long accessTime, public CachedBlock(String blockName, Cacheable buf, long accessTime,
boolean inMemory) { boolean inMemory) {
this.blockName = blockName; this.blockName = blockName;
this.buf = buf; this.buf = buf;
@ -97,7 +95,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
return this.accessTime < that.accessTime ? 1 : -1; return this.accessTime < that.accessTime ? 1 : -1;
} }
public HeapSize getBuffer() { public Cacheable getBuffer() {
return this.buf; return this.buf;
} }

View File

@ -0,0 +1,168 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.slab.SlabCache;
import org.apache.hadoop.util.StringUtils;
/**
* DoubleBlockCache is an abstraction layer that combines two caches, the
* smaller onHeapCache and the larger offHeapCache. CacheBlock attempts to cache
* the block in both caches, while readblock reads first from the faster on heap
* cache before looking for the block in the off heap cache. Metrics are the
* combined size and hits and misses of both caches.
*
**/
public class DoubleBlockCache implements BlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName());
private final LruBlockCache onHeapCache;
private final SlabCache offHeapCache;
private final CacheStats stats;
/**
* Default constructor. Specify maximum size and expected average block size
* (approximation is fine).
* <p>
* All other factors will be calculated based on defaults specified in this
* class.
*
* @param maxSize
* maximum size of cache, in bytes
* @param blockSize
* approximate size of each block, in bytes
*/
public DoubleBlockCache(long onHeapSize, long offHeapSize, long blockSizeLru,
long blockSizeSlab) {
LOG.info("Creating on-heap cache of size "
+ StringUtils.humanReadableInt(onHeapSize)
+ "bytes with an average block size of "
+ StringUtils.humanReadableInt(blockSizeLru) + " bytes.");
onHeapCache = new LruBlockCache(onHeapSize, blockSizeLru);
LOG.info("Creating off-heap cache of size "
+ StringUtils.humanReadableInt(offHeapSize)
+ "bytes with an average block size of "
+ StringUtils.humanReadableInt(blockSizeSlab) + " bytes.");
offHeapCache = new SlabCache(offHeapSize, blockSizeSlab);
this.stats = new CacheStats();
}
@Override
public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
onHeapCache.cacheBlock(blockName, buf, inMemory);
offHeapCache.cacheBlock(blockName, buf);
}
@Override
public void cacheBlock(String blockName, Cacheable buf) {
onHeapCache.cacheBlock(blockName, buf);
offHeapCache.cacheBlock(blockName, buf);
}
@Override
public Cacheable getBlock(String blockName, boolean caching) {
Cacheable cachedBlock;
if ((cachedBlock = onHeapCache.getBlock(blockName, caching)) != null) {
stats.hit(caching);
return cachedBlock;
} else if ((cachedBlock = offHeapCache.getBlock(blockName, caching)) != null) {
if (caching) {
onHeapCache.cacheBlock(blockName, cachedBlock);
}
stats.hit(caching);
return cachedBlock;
}
stats.miss(caching);
return null;
}
@Override
public boolean evictBlock(String blockName) {
stats.evict();
boolean cacheA = onHeapCache.evictBlock(blockName);
boolean cacheB = offHeapCache.evictBlock(blockName);
boolean evicted = cacheA || cacheB;
if (evicted) {
stats.evicted();
}
return evicted;
}
@Override
public CacheStats getStats() {
return this.stats;
}
@Override
public void shutdown() {
onHeapCache.shutdown();
offHeapCache.shutdown();
}
@Override
public long heapSize() {
return onHeapCache.heapSize() + offHeapCache.heapSize();
}
public long size() {
return onHeapCache.size() + offHeapCache.size();
}
public long getFreeSize() {
return onHeapCache.getFreeSize() + offHeapCache.getFreeSize();
}
public long getCurrentSize() {
return onHeapCache.getCurrentSize() + offHeapCache.getCurrentSize();
}
public long getEvictedCount() {
return onHeapCache.getEvictedCount() + offHeapCache.getEvictedCount();
}
@Override
public int evictBlocksByPrefix(String prefix) {
onHeapCache.evictBlocksByPrefix(prefix);
offHeapCache.evictBlocksByPrefix(prefix);
return 0;
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
Configuration conf) throws IOException {
return onHeapCache.getBlockCacheColumnFamilySummaries(conf);
}
}

View File

@ -29,12 +29,10 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.io.DoubleOutputStream; import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -75,7 +73,7 @@ import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
* The version 2 block representation in the block cache is the same as above, * The version 2 block representation in the block cache is the same as above,
* except that the data section is always uncompressed in the cache. * except that the data section is always uncompressed in the cache.
*/ */
public class HFileBlock implements HeapSize { public class HFileBlock implements Cacheable {
/** The size of a version 2 {@link HFile} block header */ /** The size of a version 2 {@link HFile} block header */
public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
@ -87,6 +85,27 @@ public class HFileBlock implements HeapSize {
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase( public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
public HFileBlock deserialize(ByteBuffer buf) throws IOException{
ByteBuffer tempCopy = buf.duplicate();
ByteBuffer newByteBuffer = ByteBuffer.allocate(tempCopy.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE);
tempCopy.limit(tempCopy.limit()
- HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
newByteBuffer.put(tempCopy);
HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
tempCopy.position(tempCopy.limit());
tempCopy.limit(tempCopy.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
ourBuffer.offset = tempCopy.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = tempCopy.getInt();
return ourBuffer;
}
};
private BlockType blockType; private BlockType blockType;
private final int onDiskSizeWithoutHeader; private final int onDiskSizeWithoutHeader;
private final int uncompressedSizeWithoutHeader; private final int uncompressedSizeWithoutHeader;
@ -398,9 +417,20 @@ public class HFileBlock implements HeapSize {
// uncompressed size, next block's on-disk size, offset and previous // uncompressed size, next block's on-disk size, offset and previous
// offset, byte buffer object, and its byte array. Might also need to add // offset, byte buffer object, and its byte array. Might also need to add
// some fields inside the byte buffer. // some fields inside the byte buffer.
return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
* Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE) + // We only add one BYTE_BUFFER_HEAP_SIZE because at any given moment, one of
ClassSize.align(buf.capacity()); // the bytebuffers will be null. But we do account for both references.
// If we are on heap, then we add the capacity of buf.
if (buf != null) {
return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
* Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE)
+ ClassSize.align(buf.capacity());
} else {
return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
* Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE);
}
} }
/** /**
@ -1438,4 +1468,70 @@ public class HFileBlock implements HeapSize {
} }
} @Override
public int getSerializedLength() {
if (buf != null) {
return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
}
return 0;
}
@Override
public void serialize(ByteBuffer destination) {
destination.put(this.buf.duplicate());
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();
}
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
return HFileBlock.blockDeserializer;
}
@Override
public boolean equals(Object comparison) {
if (this == comparison) {
return true;
}
if (comparison == null) {
return false;
}
if (comparison.getClass() != this.getClass()) {
return false;
}
HFileBlock castedComparison = (HFileBlock) comparison;
if (castedComparison.blockType != this.blockType) {
return false;
}
if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
return false;
}
if (castedComparison.offset != this.offset) {
return false;
}
if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
return false;
}
if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
return false;
}
if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
return false;
}
if (this.buf.compareTo(castedComparison.buf) != 0) {
return false;
}
if (this.buf.position() != castedComparison.buf.position()){
return false;
}
if (this.buf.limit() != castedComparison.buf.limit()){
return false;
}
return true;
}
}

View File

@ -261,7 +261,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param buf block buffer * @param buf block buffer
* @param inMemory if block is in-memory * @param inMemory if block is in-memory
*/ */
public void cacheBlock(String blockName, HeapSize buf, boolean inMemory) { public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
CachedBlock cb = map.get(blockName); CachedBlock cb = map.get(blockName);
if(cb != null) { if(cb != null) {
throw new RuntimeException("Cached an already cached block"); throw new RuntimeException("Cached an already cached block");
@ -285,7 +285,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param blockName block name * @param blockName block name
* @param buf block buffer * @param buf block buffer
*/ */
public void cacheBlock(String blockName, HeapSize buf) { public void cacheBlock(String blockName, Cacheable buf) {
cacheBlock(blockName, buf, false); cacheBlock(blockName, buf, false);
} }
@ -294,7 +294,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param blockName block name * @param blockName block name
* @return buffer of specified block name, or null if not in cache * @return buffer of specified block name, or null if not in cache
*/ */
public HeapSize getBlock(String blockName, boolean caching) { public Cacheable getBlock(String blockName, boolean caching) {
CachedBlock cb = map.get(blockName); CachedBlock cb = map.get(blockName);
if(cb == null) { if(cb == null) {
stats.miss(caching); stats.miss(caching);
@ -624,100 +624,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
public CacheStats getStats() { public CacheStats getStats() {
return this.stats; return this.stats;
} }
public static class CacheStats {
/** The number of getBlock requests that were cache hits */
private final AtomicLong hitCount = new AtomicLong(0);
/**
* The number of getBlock requests that were cache hits, but only from
* requests that were set to use the block cache. This is because all reads
* attempt to read from the block cache even if they will not put new blocks
* into the block cache. See HBASE-2253 for more information.
*/
private final AtomicLong hitCachingCount = new AtomicLong(0);
/** The number of getBlock requests that were cache misses */
private final AtomicLong missCount = new AtomicLong(0);
/**
* The number of getBlock requests that were cache misses, but only from
* requests that were set to use the block cache.
*/
private final AtomicLong missCachingCount = new AtomicLong(0);
/** The number of times an eviction has occurred */
private final AtomicLong evictionCount = new AtomicLong(0);
/** The total number of blocks that have been evicted */
private final AtomicLong evictedCount = new AtomicLong(0);
public void miss(boolean caching) {
missCount.incrementAndGet();
if (caching) missCachingCount.incrementAndGet();
}
public void hit(boolean caching) {
hitCount.incrementAndGet();
if (caching) hitCachingCount.incrementAndGet();
}
public void evict() {
evictionCount.incrementAndGet();
}
public void evicted() {
evictedCount.incrementAndGet();
}
public long getRequestCount() {
return getHitCount() + getMissCount();
}
public long getRequestCachingCount() {
return getHitCachingCount() + getMissCachingCount();
}
public long getMissCount() {
return missCount.get();
}
public long getMissCachingCount() {
return missCachingCount.get();
}
public long getHitCount() {
return hitCount.get();
}
public long getHitCachingCount() {
return hitCachingCount.get();
}
public long getEvictionCount() {
return evictionCount.get();
}
public long getEvictedCount() {
return evictedCount.get();
}
public double getHitRatio() {
return ((float)getHitCount()/(float)getRequestCount());
}
public double getHitCachingRatio() {
return ((float)getHitCachingCount()/(float)getRequestCachingCount());
}
public double getMissRatio() {
return ((float)getMissCount()/(float)getRequestCount());
}
public double getMissCachingRatio() {
return ((float)getMissCachingCount()/(float)getRequestCachingCount());
}
public double evictedPerEviction() {
return ((float)getEvictedCount()/(float)getEvictionCount());
}
}
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align( public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
(3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN

View File

@ -26,17 +26,15 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/** /**
* Simple one RFile soft reference cache. * Simple one RFile soft reference cache.
*/ */
public class SimpleBlockCache implements BlockCache { public class SimpleBlockCache implements BlockCache {
private static class Ref extends SoftReference<HeapSize> { private static class Ref extends SoftReference<Cacheable> {
public String blockId; public String blockId;
public Ref(String blockId, HeapSize block, ReferenceQueue q) { public Ref(String blockId, Cacheable block, ReferenceQueue q) {
super(block, q); super(block, q);
this.blockId = blockId; this.blockId = blockId;
} }
@ -70,7 +68,7 @@ public class SimpleBlockCache implements BlockCache {
return cache.size(); return cache.size();
} }
public synchronized HeapSize getBlock(String blockName, boolean caching) { public synchronized Cacheable getBlock(String blockName, boolean caching) {
processQueue(); // clear out some crap. processQueue(); // clear out some crap.
Ref ref = cache.get(blockName); Ref ref = cache.get(blockName);
if (ref == null) if (ref == null)
@ -78,11 +76,11 @@ public class SimpleBlockCache implements BlockCache {
return ref.get(); return ref.get();
} }
public synchronized void cacheBlock(String blockName, HeapSize block) { public synchronized void cacheBlock(String blockName, Cacheable block) {
cache.put(blockName, new Ref(blockName, block, q)); cache.put(blockName, new Ref(blockName, block, q));
} }
public synchronized void cacheBlock(String blockName, HeapSize block, public synchronized void cacheBlock(String blockName, Cacheable block,
boolean inMemory) { boolean inMemory) {
cache.put(blockName, new Ref(blockName, block, q)); cache.put(blockName, new Ref(blockName, block, q));
} }
@ -124,7 +122,7 @@ public class SimpleBlockCache implements BlockCache {
public int evictBlocksByPrefix(String string) { public int evictBlocksByPrefix(String string) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) { public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -0,0 +1,306 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.util.StringUtils;
import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
* SingleSizeCache is a slab allocated cache that caches elements up to a single
* size. It uses a slab allocator (Slab.java) to divide a direct bytebuffer,
* into evenly sized blocks. Any cached data will take up exactly 1 block. An
* exception will be thrown if the cached data cannot fit into the blockSize of
* this SingleSizeCache.
*
* Eviction and LRUness is taken care of by Guava's MapMaker, which creates a
* ConcurrentLinkedHashMap.
*
**/
public class SingleSizeCache implements BlockCache {
private final Slab backingStore;
private final ConcurrentMap<String, CacheablePair> backingMap;
private final int numBlocks;
private final int blockSize;
private final CacheStats stats;
private final SlabItemEvictionWatcher evictionWatcher;
private AtomicLong size;
public final static long CACHE_FIXED_OVERHEAD = ClassSize
.align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE)
+ +ClassSize.OBJECT);
static final Log LOG = LogFactory.getLog(SingleSizeCache.class);
/**
* Default constructor. Specify the size of the blocks, number of blocks, and
* the SlabCache this cache will be assigned to.
*
*
* @param blockSize the size of each block, in bytes
*
* @param numBlocks the number of blocks of blockSize this cache will hold.
*
* @param master the SlabCache this SingleSlabCache is assigned to.
*/
public SingleSizeCache(int blockSize, int numBlocks,
SlabItemEvictionWatcher master) {
this.blockSize = blockSize;
this.numBlocks = numBlocks;
backingStore = new Slab(blockSize, numBlocks);
this.stats = new CacheStats();
this.evictionWatcher = master;
this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize());
// This evictionListener is called whenever the cache automatically evicts
// something.
MapEvictionListener<String, CacheablePair> listener = new MapEvictionListener<String, CacheablePair>() {
@Override
public void onEviction(String key, CacheablePair value) {
try {
value.evictionLock.writeLock().lock();
backingStore.free(value.serializedData);
stats.evict();
/**
* We may choose to run this cache alone, without the SlabCache on
* top, no evictionWatcher in that case
*/
if (evictionWatcher != null) {
evictionWatcher.onEviction(key, false);
}
size.addAndGet(-1 * value.heapSize());
stats.evicted();
} finally {
value.evictionLock.writeLock().unlock();
}
}
};
backingMap = new MapMaker().maximumSize(numBlocks - 1)
.evictionListener(listener).makeMap();
}
@Override
public synchronized void cacheBlock(String blockName, Cacheable toBeCached) {
ByteBuffer storedBlock = backingStore.alloc(toBeCached
.getSerializedLength());
CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(),
storedBlock);
CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry);
if (alreadyCached != null) {
backingStore.free(storedBlock);
throw new RuntimeException("already cached " + blockName);
}
toBeCached.serialize(storedBlock);
this.size.addAndGet(newEntry.heapSize());
}
@Override
public Cacheable getBlock(String key, boolean caching) {
CacheablePair contentBlock = backingMap.get(key);
if (contentBlock == null) {
stats.miss(caching);
return null;
}
stats.hit(caching);
// If lock cannot be obtained, that means we're undergoing eviction.
if (contentBlock.evictionLock.readLock().tryLock()) {
try {
return contentBlock.deserializer
.deserialize(contentBlock.serializedData);
} catch (IOException e) {
e.printStackTrace();
LOG.warn("Deserializer throwing ioexception, possibly deserializing wrong object buffer");
return null;
} finally {
contentBlock.evictionLock.readLock().unlock();
}
}
return null;
}
/**
* Evicts the block
*
* @param key the key of the entry we are going to evict
* @return the evicted ByteBuffer
*/
public boolean evictBlock(String key) {
stats.evict();
CacheablePair evictedBlock = backingMap.remove(key);
if (evictedBlock != null) {
try {
evictedBlock.evictionLock.writeLock().lock();
backingStore.free(evictedBlock.serializedData);
evictionWatcher.onEviction(key, false);
stats.evicted();
size.addAndGet(-1 * evictedBlock.heapSize());
} finally {
evictedBlock.evictionLock.writeLock().unlock();
}
}
return evictedBlock != null;
}
public void logStats() {
LOG.info("For Slab of size " + this.blockSize + ": "
+ this.getOccupiedSize() / this.blockSize
+ " occupied, out of a capacity of " + this.numBlocks
+ " blocks. HeapSize is "
+ StringUtils.humanReadableInt(this.heapSize()) + " bytes.");
LOG.debug("Slab Stats: " + "accesses="
+ stats.getRequestCount()
+ ", "
+ "hits="
+ stats.getHitCount()
+ ", "
+ "hitRatio="
+ (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(
stats.getHitRatio(), 2) + "%, "))
+ "cachingAccesses="
+ stats.getRequestCachingCount()
+ ", "
+ "cachingHits="
+ stats.getHitCachingCount()
+ ", "
+ "cachingHitsRatio="
+ (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(
stats.getHitCachingRatio(), 2) + "%, ")) + "evictions="
+ stats.getEvictionCount() + ", " + "evicted="
+ stats.getEvictedCount() + ", " + "evictedPerRun="
+ stats.evictedPerEviction());
}
public void shutdown() {
backingStore.shutdown();
}
public long heapSize() {
return this.size() + backingStore.heapSize();
}
public long size() {
return this.blockSize * this.numBlocks;
}
public long getFreeSize() {
return backingStore.getBlocksRemaining() * blockSize;
}
public long getOccupiedSize() {
return (numBlocks - backingStore.getBlocksRemaining()) * blockSize;
}
public long getEvictedCount() {
return stats.getEvictedCount();
}
public CacheStats getStats() {
return this.stats;
}
/* Since its offheap, it doesn't matter if its in memory or not */
@Override
public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
this.cacheBlock(blockName, buf);
}
/*
* This is never called, as evictions are handled in the SlabCache layer,
* implemented in the event we want to use this as a standalone cache.
*/
@Override
public int evictBlocksByPrefix(String prefix) {
int evictedCount = 0;
for (String e : backingMap.keySet()) {
if (e.startsWith(prefix)) {
this.evictBlock(e);
}
}
return evictedCount;
}
@Override
public long getCurrentSize() {
return 0;
}
/*
* Not implemented. Extremely costly to do this from the off heap cache, you'd
* need to copy every object on heap once
*/
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
Configuration conf) {
throw new UnsupportedOperationException();
}
/* Just a pair class, holds a reference to the parent cacheable */
private class CacheablePair implements HeapSize {
final CacheableDeserializer<Cacheable> deserializer;
final ByteBuffer serializedData;
final ReentrantReadWriteLock evictionLock;
private CacheablePair(CacheableDeserializer<Cacheable> deserializer,
ByteBuffer serializedData) {
this.deserializer = deserializer;
this.serializedData = serializedData;
evictionLock = new ReentrantReadWriteLock();
}
/*
* Heapsize overhead of this is the default object overhead, the heapsize of
* the serialized object, and the cost of a reference to the bytebuffer,
* which is already accounted for in SingleSizeCache
*/
@Override
public long heapSize() {
return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE * 3
+ ClassSize.REENTRANT_LOCK);
}
}
}

View File

@ -0,0 +1,131 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import com.google.common.base.Preconditions;
/**
* Slab is a class which is designed to allocate blocks of a certain size.
* Constructor creates a number of DirectByteBuffers and slices them into the
* requisite size, then puts them all in a buffer.
**/
class Slab implements org.apache.hadoop.hbase.io.HeapSize {
static final Log LOG = LogFactory.getLog(Slab.class);
/** This is where our items, or blocks of the slab, are stored. */
private ConcurrentLinkedQueue<ByteBuffer> buffers;
/** This is where our Slabs are stored */
private ConcurrentLinkedQueue<ByteBuffer> slabs;
private final int blockSize;
private final int numBlocks;
private long heapSize;
Slab(int blockSize, int numBlocks) {
buffers = new ConcurrentLinkedQueue<ByteBuffer>();
slabs = new ConcurrentLinkedQueue<ByteBuffer>();
this.blockSize = blockSize;
this.numBlocks = numBlocks;
this.heapSize = ClassSize.estimateBase(this.getClass(), false);
int maxBlocksPerSlab = Integer.MAX_VALUE / blockSize;
int maxSlabSize = maxBlocksPerSlab * blockSize;
int numFullSlabs = numBlocks / maxBlocksPerSlab;
int partialSlabSize = (numBlocks % maxBlocksPerSlab) * blockSize;
for (int i = 0; i < numFullSlabs; i++) {
allocateAndSlice(maxSlabSize, blockSize);
}
if (partialSlabSize > 0) {
allocateAndSlice(partialSlabSize, blockSize);
}
}
private void allocateAndSlice(int size, int sliceSize) {
ByteBuffer newSlab = ByteBuffer.allocateDirect(size);
slabs.add(newSlab);
for (int j = 0; j < newSlab.capacity(); j += sliceSize) {
newSlab.limit(j + sliceSize).position(j);
ByteBuffer aSlice = newSlab.slice();
buffers.add(aSlice);
heapSize += ClassSize.estimateBase(aSlice.getClass(), false);
}
}
/*
* Shutdown deallocates the memory for all the DirectByteBuffers. Each
* DirectByteBuffer has a "cleaner" method, which is similar to a
* deconstructor in C++.
*/
void shutdown() {
for (ByteBuffer aSlab : slabs) {
try {
DirectMemoryUtils.destroyDirectByteBuffer(aSlab);
} catch (Exception e) {
LOG.warn("Unable to deallocate direct memory during shutdown", e);
}
}
}
int getBlockSize() {
return this.blockSize;
}
int getBlockCapacity() {
return this.numBlocks;
}
int getBlocksRemaining() {
return this.buffers.size();
}
/*
* This spinlocks if empty. Make sure your program can deal with that, and
* will complete eviction on time.
*/
ByteBuffer alloc(int bufferSize) {
int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize);
while (buffers.isEmpty()); // Spinlock
ByteBuffer returnedBuffer = buffers.remove();
returnedBuffer.clear().limit(newCapacity);
return returnedBuffer;
}
void free(ByteBuffer toBeFreed) {
Preconditions.checkArgument(toBeFreed.capacity() == blockSize);
buffers.add(toBeFreed);
}
@Override
public long heapSize() {
return heapSize;
}
}

View File

@ -0,0 +1,392 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import java.math.BigDecimal;
import java.util.Map.Entry;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
* order to determine where a given element fits. Redirects gets and puts to the
* correct SingleSizeCache.
*
**/
public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize {
private final ConcurrentHashMap<String, SingleSizeCache> backingStore;
private final TreeMap<Integer, SingleSizeCache> sizer;
static final Log LOG = LogFactory.getLog(SlabCache.class);
static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
private final ScheduledExecutorService scheduleThreadPool = Executors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Slab Statistics #%d")
.build());
long size;
private final CacheStats stats;
final SlabStats slabstats;
private final long avgBlockSize;
private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
SlabCache.class, false);
/**
* Default constructor, creates an empty SlabCache.
*
* @param size Total size allocated to the SlabCache. (Bytes)
* @param avgBlockSize Average size of a block being cached.
**/
public SlabCache(long size, long avgBlockSize) {
this.avgBlockSize = avgBlockSize;
this.size = size;
this.stats = new CacheStats();
this.slabstats = new SlabStats();
backingStore = new ConcurrentHashMap<String, SingleSizeCache>();
sizer = new TreeMap<Integer, SingleSizeCache>();
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
}
/**
* A way of allocating the desired amount of Slabs of each particular size.
*
* This reads two lists from conf, hbase.offheap.slab.proportions and
* hbase.offheap.slab.sizes.
*
* The first list is the percentage of our total space we allocate to the
* slabs.
*
* The second list is blocksize of the slabs in bytes. (E.g. the slab holds
* blocks of this size).
*
* @param Configuration file.
*/
public void addSlabByConf(Configuration conf) {
// Proportions we allocate to each slab of the total size.
String[] porportions = conf.getStrings(
"hbase.offheapcache.slab.proportions", "0.80", "0.20");
String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes", new Long(
avgBlockSize * 11 / 10).toString(), new Long(avgBlockSize * 21 / 10)
.toString());
if (porportions.length != sizes.length) {
throw new IllegalArgumentException(
"SlabCache conf not "
+ "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
+ porportions.length
+ " slabs while hbase.offheap.slab.sizes specifies "
+ sizes.length + " slabs "
+ "offheapslabporportions and offheapslabsizes");
}
/* We use BigDecimals instead of floats because float rounding is annoying */
BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
BigDecimal sumProportions = new BigDecimal(0);
for (BigDecimal b : parsedProportions) {
/* Make sure all proportions are greater than 0 */
Preconditions
.checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
"Proportions in hbase.offheap.slab.proportions must be greater than 0!");
sumProportions = sumProportions.add(b);
}
/* If the sum is greater than 1 */
Preconditions
.checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
"Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
/* If the sum of all proportions is less than 0.99 */
if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
}
for (int i = 0; i < parsedProportions.length; i++) {
int blockSize = parsedSizes[i].intValue();
int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
.divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
addSlab(blockSize, numBlocks);
}
}
/**
* Gets the size of the slab cache a ByteBuffer of this size would be
* allocated to.
*
* @param size Size of the ByteBuffer we are checking.
*
* @return the Slab that the above bytebuffer would be allocated towards. If
* object is too large, returns null.
*/
Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
return sizer.higherEntry(size - 1);
}
private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
BigDecimal[] parsed = new BigDecimal[parsee.length];
for (int i = 0; i < parsee.length; i++) {
parsed[i] = new BigDecimal(parsee[i].trim());
}
return parsed;
}
private void addSlab(int blockSize, int numBlocks) {
sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
}
/**
* Cache the block with the specified name and buffer. First finds what size
* SingleSlabCache it should fit in. If the block doesn't fit in any, it will
* return without doing anything.
* <p>
* It is assumed this will NEVER be called on an already cached block. If that
* is done, it is assumed that you are reinserting the same exact block due to
* a race condition, and will throw a runtime exception.
*
* @param blockName block name
* @param cachedItem block buffer
*/
public void cacheBlock(String blockName, Cacheable cachedItem) {
Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
.getSerializedLength());
this.slabstats.addin(cachedItem.getSerializedLength());
if (scacheEntry == null) {
return; // we can't cache, something too big.
}
SingleSizeCache scache = scacheEntry.getValue();
scache.cacheBlock(blockName, cachedItem); // if this
// fails, due to
// block already
// being there, exception will be thrown
backingStore.put(blockName, scache);
}
/**
* We don't care about whether its in memory or not, so we just pass the call
* through.
*/
public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
cacheBlock(blockName, buf);
}
public CacheStats getStats() {
return this.stats;
}
/**
* Get the buffer of the block with the specified name.
*
* @param blockName block name
* @return buffer of specified block name, or null if not in cache
*/
public Cacheable getBlock(String key, boolean caching) {
SingleSizeCache cachedBlock = backingStore.get(key);
if (cachedBlock == null) {
return null;
}
Cacheable contentBlock = cachedBlock.getBlock(key, caching);
if (contentBlock != null) {
stats.hit(caching);
} else {
stats.miss(caching);
}
return contentBlock;
}
/**
* Evicts a block from the cache. This is public, and thus contributes to the
* the evict counter.
*/
public boolean evictBlock(String key) {
stats.evict();
return onEviction(key, true);
}
@Override
public boolean onEviction(String key, boolean callAssignedCache) {
SingleSizeCache cacheEntry = backingStore.remove(key);
if (cacheEntry == null) {
return false;
}
/* we need to bump up stats.evict, as this call came from the assignedCache. */
if (callAssignedCache == false) {
stats.evict();
}
stats.evicted();
if (callAssignedCache) {
cacheEntry.evictBlock(key);
}
return true;
}
/**
* Sends a shutdown to all SingleSizeCache's contained by this cache.F
*/
public void shutdown() {
for (SingleSizeCache s : sizer.values()) {
s.shutdown();
}
}
public long heapSize() {
long childCacheSize = 0;
for (SingleSizeCache s : sizer.values()) {
childCacheSize += s.heapSize();
}
return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
}
public long size() {
return this.size;
}
public long getFreeSize() {
return 0; // this cache, by default, allocates all its space.
}
public long getCurrentSize() {
return size;
}
public long getEvictedCount() {
return stats.getEvictedCount();
}
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class StatisticsThread extends Thread {
SlabCache ourcache;
public StatisticsThread(SlabCache slabCache) {
super("SlabCache.StatisticsThread");
setDaemon(true);
this.ourcache = slabCache;
}
@Override
public void run() {
ourcache.slabstats.logStats(ourcache);
}
}
/**
* Just like CacheStats, but more Slab specific. Finely grained profiling of
* sizes we store using logs.
*
*/
static class SlabStats {
// the maximum size somebody will ever try to cache, then we multiply by 10
// so we have finer grained stats.
private final int MULTIPLIER = 10;
private final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
public SlabStats() {
for (int i = 0; i < NUMDIVISIONS; i++) {
counts[i] = new AtomicLong();
}
}
public void addin(int size) {
int index = (int) (Math.log(size) * MULTIPLIER);
counts[index].incrementAndGet();
}
public AtomicLong[] getUsage() {
return counts;
}
public void logStats(SlabCache slabCache) {
for (SingleSizeCache s : slabCache.sizer.values()) {
s.logStats();
}
AtomicLong[] fineGrainedStats = getUsage();
int multiplier = MULTIPLIER;
SlabCache.LOG.info("Current heap size is: "
+ StringUtils.humanReadableInt(slabCache.heapSize()));
for (int i = 0; i < fineGrainedStats.length; i++) {
double lowerbound = Math.pow(Math.E, (double) i / (double) multiplier
- 0.5);
double upperbound = Math.pow(Math.E, (double) i / (double) multiplier
+ 0.5);
SlabCache.LOG.info("From "
+ StringUtils.humanReadableInt((long) lowerbound) + "- "
+ StringUtils.humanReadableInt((long) upperbound) + ": "
+ StringUtils.humanReadableInt(fineGrainedStats[i].get())
+ " requests");
}
}
}
public int evictBlocksByPrefix(String prefix) {
int numEvicted = 0;
for (String key : backingStore.keySet()) {
if (key.startsWith(prefix)) {
if (evictBlock(key))
++numEvicted;
}
}
return numEvicted;
}
/*
* Not implemented. Extremely costly to do this from the off heap cache, you'd
* need to copy every object on heap once
*/
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
Configuration conf) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
/**
* Interface for objects that want to know when an eviction occurs.
* */
interface SlabItemEvictionWatcher {
/**
* This is called as a callback by the EvictionListener in each of the
* SingleSizeSlabCaches.
*
* @param key the key of the item being evicted
* @param boolean callAssignedCache whether we should call the cache which the
* key was originally assigned to.
*/
boolean onEviction(String key, boolean callAssignedCache);
}

View File

@ -99,7 +99,8 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@ -630,9 +631,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
closeUserRegions(this.abortRequested); closeUserRegions(this.abortRequested);
} else if (this.stopping) { } else if (this.stopping) {
LOG.info("Stopping meta regions, if the HRegionServer hosts any"); LOG.info("Stopping meta regions, if the HRegionServer hosts any");
boolean allUserRegionsOffline = areAllUserRegionsOffline(); boolean allUserRegionsOffline = areAllUserRegionsOffline();
if (allUserRegionsOffline) { if (allUserRegionsOffline) {
// Set stopped if no requests since last time we went around the loop. // Set stopped if no requests since last time we went around the loop.
// The remaining meta regions will be closed on our way out. // The remaining meta regions will be closed on our way out.
@ -1072,13 +1073,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
super("CompactionChecker", sleepTime, h); super("CompactionChecker", sleepTime, h);
this.instance = h; this.instance = h;
LOG.info("Runs every " + StringUtils.formatTime(sleepTime)); LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
/* MajorCompactPriority is configurable. /* MajorCompactPriority is configurable.
* If not set, the compaction will use default priority. * If not set, the compaction will use default priority.
*/ */
this.majorCompactPriority = this.instance.conf. this.majorCompactPriority = this.instance.conf.
getInt("hbase.regionserver.compactionChecker.majorCompactPriority", getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
DEFAULT_PRIORITY); DEFAULT_PRIORITY);
} }
@Override @Override
@ -1093,14 +1094,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.instance.compactSplitThread.requestCompaction(r, s, this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests compaction"); getName() + " requests compaction");
} else if (s.isMajorCompaction()) { } else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY || if (majorCompactPriority == DEFAULT_PRIORITY ||
majorCompactPriority > r.getCompactPriority()) { majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use default priority"); getName() + " requests major compaction; use default priority");
} else { } else {
this.instance.compactSplitThread.requestCompaction(r, s, this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use configured priority", getName() + " requests major compaction; use configured priority",
this.majorCompactPriority); this.majorCompactPriority);
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -1225,7 +1226,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
totalStaticBloomSize += store.getTotalStaticBloomSize(); totalStaticBloomSize += store.getTotalStaticBloomSize();
} }
} }
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
} }
this.metrics.stores.set(stores); this.metrics.stores.set(stores);
@ -1262,7 +1263,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
getServerName().getHostname()); getServerName().getHostname());
int percent = (int) (localityIndex * 100); int percent = (int) (localityIndex * 100);
this.metrics.hdfsBlocksLocalityIndex.set(percent); this.metrics.hdfsBlocksLocalityIndex.set(percent);
} }
/** /**
@ -1351,7 +1352,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
while (true) { while (true) {
try { try {
this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class); this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
this.infoServer.setAttribute(REGIONSERVER, this); this.infoServer.setAttribute(REGIONSERVER, this);
this.infoServer.start(); this.infoServer.start();
break; break;
@ -1834,7 +1835,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+ "regionName is null"); + "regionName is null");
} }
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
Integer lock = getLockFromId(put.getLockId()); Integer lock = getLockFromId(put.getLockId());
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost() Boolean result = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, compareOp, comparator, put); .preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
@ -1873,7 +1874,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+ "regionName is null"); + "regionName is null");
} }
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
Integer lock = getLockFromId(delete.getLockId()); Integer lock = getLockFromId(delete.getLockId());
WritableByteArrayComparable comparator = new BinaryComparator(value); WritableByteArrayComparable comparator = new BinaryComparator(value);
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
@ -1914,7 +1915,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+ "regionName is null"); + "regionName is null");
} }
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
Integer lock = getLockFromId(delete.getLockId()); Integer lock = getLockFromId(delete.getLockId());
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
family, qualifier, compareOp, comparator, delete); family, qualifier, compareOp, comparator, delete);

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.DoubleBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
@ -375,9 +377,15 @@ public class StoreFile {
// Calculate the amount of heap to give the heap. // Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long cacheSize = (long)(mu.getMax() * cachePercentage); long cacheSize = (long)(mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0.95) * DirectMemoryUtils.getDirectMemorySize());
LOG.info("Allocating LruBlockCache with maximum size " + LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(cacheSize)); StringUtils.humanReadableInt(cacheSize));
hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL); if(offHeapCacheSize <= 0) {
hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
} else {
hfileBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, DEFAULT_BLOCKSIZE_SMALL, blockSize);
}
return hfileBlockCache; return hfileBlockCache;
} }

View File

@ -0,0 +1,95 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import com.google.common.base.Preconditions;
public class DirectMemoryUtils {
/**
* @return the setting of -XX:MaxDirectMemorySize as a long. Returns 0 if
* -XX:MaxDirectMemorySize is not set.
*/
public static long getDirectMemorySize() {
RuntimeMXBean RuntimemxBean = ManagementFactory.getRuntimeMXBean();
List<String> arguments = RuntimemxBean.getInputArguments();
long multiplier = 1; //for the byte case.
for (String s : arguments) {
if (s.contains("-XX:MaxDirectMemorySize=")) {
String memSize = s.toLowerCase()
.replace("-xx:maxdirectmemorysize=", "").trim();
if (memSize.contains("k")) {
multiplier = 1024;
}
else if (memSize.contains("m")) {
multiplier = 1048576;
}
else if (memSize.contains("g")) {
multiplier = 1073741824;
}
memSize = memSize.replaceAll("[^\\d]", "");
long retValue = Long.parseLong(memSize);
return retValue * multiplier;
}
}
return 0;
}
/**
* DirectByteBuffers are garbage collected by using a phantom reference and a
* reference queue. Every once a while, the JVM checks the reference queue and
* cleans the DirectByteBuffers. However, as this doesn't happen
* immediately after discarding all references to a DirectByteBuffer, it's
* easy to OutOfMemoryError yourself using DirectByteBuffers. This function
* explicitly calls the Cleaner method of a DirectByteBuffer.
*
* @param toBeDestroyed
* The DirectByteBuffer that will be "cleaned". Utilizes reflection.
*
*/
public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
throws IllegalArgumentException, IllegalAccessException,
InvocationTargetException, SecurityException, NoSuchMethodException {
Preconditions.checkArgument(toBeDestroyed.isDirect(),
"toBeDestroyed isn't direct!");
Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(toBeDestroyed);
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
public class CacheTestUtils {
public static void testCacheMultiThreaded(final BlockCache toBeTested,
final int blockSize, final int numThreads, final int numQueries,
final double passingScore) throws Exception {
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
final AtomicInteger hits = new AtomicInteger();
final AtomicInteger miss = new AtomicInteger();
HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
blocksToTest.addAll(Arrays.asList(blocks));
for (int i = 0; i < numThreads; i++) {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
if (!blocksToTest.isEmpty()) {
HFileBlockPair ourBlock = blocksToTest.remove();
toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
false);
if (retrievedBlock != null) {
assertEquals(ourBlock.block, retrievedBlock);
hits.incrementAndGet();
} else {
miss.incrementAndGet();
}
totalQueries.incrementAndGet();
}
}
};
ctx.addThread(t);
}
ctx.startThreads();
while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
if((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore){
fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
}
}
public static void testCacheSimple(BlockCache toBeTested, int blockSize,
int numBlocks) throws Exception {
HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
// Confirm empty
for (HFileBlockPair block : blocks) {
assertNull(toBeTested.getBlock(block.blockName, true));
}
// Add blocks
for (HFileBlockPair block : blocks) {
toBeTested.cacheBlock(block.blockName, block.block);
}
// Check if all blocks are properly cached and contain the right
// information, or the blocks are null.
// MapMaker makes no guarantees when it will evict, so neither can we.
for (HFileBlockPair block : blocks) {
HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true);
if (buf != null) {
assertEquals(block.block, buf);
}
}
// Re-add some duplicate blocks. Hope nothing breaks.
for (HFileBlockPair block : blocks) {
try {
if (toBeTested.getBlock(block.blockName, true) != null) {
toBeTested.cacheBlock(block.blockName, block.block);
fail("Cache should not allow re-caching a block");
}
} catch (RuntimeException re) {
// expected
}
}
}
public static void hammerSingleKey(final BlockCache toBeTested,
int BlockSize, int numThreads, int numQueries) throws Exception {
final HFileBlockPair kv = generateHFileBlocks(BlockSize, 1)[0];
Configuration conf = new Configuration();
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
conf);
final AtomicInteger totalQueries = new AtomicInteger();
toBeTested.cacheBlock(kv.blockName, kv.block);
for (int i = 0; i < numThreads; i++) {
TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
assertEquals(kv.block, toBeTested.getBlock(kv.blockName, false));
totalQueries.incrementAndGet();
}
};
ctx.addThread(t);
}
ctx.startThreads();
while (totalQueries.get() < numQueries && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
}
private static HFileBlockPair[] generateHFileBlocks(int blockSize,
int numBlocks) {
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
Random rand = new Random();
HashSet<String> usedStrings = new HashSet<String>();
for (int i = 0; i < numBlocks; i++) {
// The buffer serialized size needs to match the size of BlockSize. So we
// declare our data size to be smaller than it by the serialization space
// required.
ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE);
rand.nextBytes(cachedBuffer.array());
cachedBuffer.rewind();
int onDiskSizeWithoutHeader = blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
int uncompressedSizeWithoutHeader = blockSize
- HFileBlock.EXTRA_SERIALIZATION_SPACE;
long prevBlockOffset = rand.nextLong();
BlockType.DATA.write(cachedBuffer);
cachedBuffer.putInt(onDiskSizeWithoutHeader);
cachedBuffer.putInt(uncompressedSizeWithoutHeader);
cachedBuffer.putLong(prevBlockOffset);
cachedBuffer.rewind();
HFileBlock generated = new HFileBlock(BlockType.DATA,
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
prevBlockOffset, cachedBuffer, false, blockSize);
String strKey;
/* No conflicting keys */
for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
.add(strKey); strKey = new Long(rand.nextLong()).toString())
;
returnedBlocks[i] = new HFileBlockPair();
returnedBlocks[i].blockName = strKey;
returnedBlocks[i].block = generated;
}
return returnedBlocks;
}
private static class HFileBlockPair {
String blockName;
HFileBlock block;
}
}

View File

@ -21,10 +21,8 @@ package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.HeapSize;
import java.util.LinkedList; import java.util.LinkedList;
import junit.framework.TestCase; import junit.framework.TestCase;
public class TestCachedBlockQueue extends TestCase { public class TestCachedBlockQueue extends TestCase {
@ -132,10 +130,26 @@ public class TestCachedBlockQueue extends TestCase {
{ {
public CachedBlock(final long heapSize, String name, long accessTime) { public CachedBlock(final long heapSize, String name, long accessTime) {
super(name, super(name,
new HeapSize(){ new Cacheable(){
@Override @Override
public long heapSize() { public long heapSize() {
return ((int)(heapSize - CachedBlock.PER_BLOCK_OVERHEAD)); return ((int)(heapSize - CachedBlock.PER_BLOCK_OVERHEAD));
}
@Override
public int getSerializedLength() {
return 0;
}
@Override
public void serialize(ByteBuffer destination) {
}
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
// TODO Auto-generated method stub
return null;
}}, }},
accessTime,false); accessTime,false);
} }

View File

@ -19,6 +19,7 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
@ -510,7 +511,7 @@ public class TestLruBlockCache extends TestCase {
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
} }
private static class CachedItem implements HeapSize { private static class CachedItem implements Cacheable {
String blockName; String blockName;
int size; int size;
@ -531,5 +532,20 @@ public class TestLruBlockCache extends TestCase {
+ ClassSize.align(blockName.length()) + ClassSize.align(blockName.length())
+ ClassSize.align(size); + ClassSize.align(size);
} }
@Override
public int getSerializedLength() {
return 0;
}
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
return null;
}
@Override
public void serialize(ByteBuffer destination) {
}
} }
} }

View File

@ -0,0 +1,67 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.slab.SingleSizeCache;
import org.junit.*;
/**
* Tests SingleSlabCache.
* <p>
*
* Tests will ensure that evictions operate when they're supposed to and do what
* they should, and that cached blocks are accessible when expected to be.
*/
public class TestSingleSizeCache {
SingleSizeCache cache;
final int CACHE_SIZE = 1000000;
final int NUM_BLOCKS = 100;
final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
final int NUM_THREADS = 100;
final int NUM_QUERIES = 10000;
@Before
public void setup() {
cache = new SingleSizeCache(BLOCK_SIZE, NUM_BLOCKS, null);
}
@After
public void tearDown() {
cache.shutdown();
}
@Test
public void testCacheSimple() throws Exception {
CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
}
@Test
public void testCacheMultiThreaded() throws Exception {
CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE,
NUM_THREADS, NUM_QUERIES, 0.80);
}
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
}

View File

@ -0,0 +1,72 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import org.junit.*;
/**Test cases for Slab.java*/
public class TestSlab {
static final int BLOCKSIZE = 1000;
static final int NUMBLOCKS = 100;
Slab testSlab;
ByteBuffer[] buffers = new ByteBuffer[NUMBLOCKS];
@Before
public void setUp() {
testSlab = new Slab(BLOCKSIZE, NUMBLOCKS);
}
@After
public void tearDown() {
testSlab.shutdown();
}
@Test
public void testBasicFunctionality() {
for (int i = 0; i < NUMBLOCKS; i++) {
buffers[i] = testSlab.alloc(BLOCKSIZE);
assertEquals(BLOCKSIZE, buffers[i].limit());
}
// write an unique integer to each allocated buffer.
for (int i = 0; i < NUMBLOCKS; i++) {
buffers[i].putInt(i);
}
// make sure the bytebuffers remain unique (the slab allocator hasn't
// allocated the same chunk of memory twice)
for (int i = 0; i < NUMBLOCKS; i++) {
buffers[i].putInt(i);
}
for (int i = 0; i < NUMBLOCKS; i++) {
testSlab.free(buffers[i]); // free all the buffers.
}
for (int i = 0; i < NUMBLOCKS; i++) {
buffers[i] = testSlab.alloc(BLOCKSIZE);
assertEquals(BLOCKSIZE, buffers[i].limit());
}
}
}

View File

@ -0,0 +1,83 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.slab;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.slab.SlabCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Basic test of SlabCache. Puts and gets.
* <p>
*
* Tests will ensure that blocks that are uncached are identical to the ones
* being cached, and that the cache never exceeds its capacity. Note that its
* fine if the cache evicts before it reaches max capacity - Guava Mapmaker may
* choose to evict at any time.
*
*/
public class TestSlabCache {
static final int CACHE_SIZE = 1000000;
static final int NUM_BLOCKS = 101;
static final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
static final int NUM_THREADS = 1000;
static final int NUM_QUERIES = 10000;
SlabCache cache;
@Before
public void setup() {
cache = new SlabCache(CACHE_SIZE + BLOCK_SIZE * 2, BLOCK_SIZE);
cache.addSlabByConf(new Configuration());
}
@After
public void tearDown() {
cache.shutdown();
}
@Test
public void testElementPlacement() {
assertEquals(cache.getHigherBlock((int) BLOCK_SIZE).getKey().intValue(),
(int) (BLOCK_SIZE * 11 / 10));
assertEquals(cache.getHigherBlock((int) (BLOCK_SIZE * 2)).getKey()
.intValue(), (int) (BLOCK_SIZE * 21 / 10));
}
@Test
public void testCacheSimple() throws Exception {
CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
}
@Test
public void testCacheMultiThreaded() throws Exception {
CacheTestUtils.testCacheMultiThreaded(cache, BLOCK_SIZE, NUM_THREADS,
NUM_QUERIES, 0.80);
}
@Test
public void testCacheMultiThreadedSingleKey() throws Exception {
CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, NUM_QUERIES);
}
}

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.*;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -33,13 +32,10 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
public class TestMemStoreLAB { public class TestMemStoreLAB {

View File

@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;