HBASE-13170 Allow block cache to be external

Summary: Add MemcachedBlockCache

Test Plan: Tested locally with PE and running memcached.

Subscribers: rajesh.nishtala, ndimiduk

Differential Revision: https://reviews.facebook.net/D34635
This commit is contained in:
Elliott Clark 2015-03-27 13:15:27 -07:00
parent ba6345f7d1
commit a4a235b8d1
10 changed files with 521 additions and 83 deletions

View File

@ -490,6 +490,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<optional>true</optional>
</dependency>
<!-- tracing Dependencies -->
<dependency>
<groupId>org.apache.htrace</groupId>

View File

@ -206,6 +206,38 @@ org.apache.hadoop.util.StringUtils;
</%if>
</%def>
<%def hits_tmpl>
<%args>
BlockCache bc;
</%args>
<tr>
<td>Hits</td>
<td><% String.format("%,d", bc.getStats().getHitCount()) %></td>
<td>Number requests that were cache hits</td>
</tr>
<tr>
<td>Hits Caching</td>
<td><% String.format("%,d", bc.getStats().getHitCachingCount()) %></td>
<td>Cache hit block requests but only requests set to cache block if a miss</td>
</tr>
<tr>
<td>Misses</td>
<td><% String.format("%,d", bc.getStats().getMissCount()) %></td>
<td>Block requests that were cache misses but set to cache missed blocks</td>
</tr>
<tr>
<td>Misses Caching</td>
<td><% String.format("%,d", bc.getStats().getMissCount()) %></td>
<td>Block requests that were cache misses but only requests set to use block cache</td>
</tr>
<tr>
<td>Hit Ratio</td>
<td><% String.format("%,.2f", bc.getStats().getHitRatio() * 100) %><% "%" %></td>
<td>Hit Count divided by total requests count</td>
</tr>
</%def>
<%def bc_stats>
<%args>
CacheConfig cacheConfig;
@ -235,31 +267,7 @@ org.apache.hadoop.util.StringUtils;
<td>Number of blocks in block cache</td>
</tr>
<& evictions_tmpl; bc = cacheConfig.getBlockCache(); &>
<tr>
<td>Hits</td>
<td><% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCount()) %></td>
<td>Number requests that were cache hits</td>
</tr>
<tr>
<td>Hits Caching</td>
<td><% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCachingCount()) %></td>
<td>Cache hit block requests but only requests set to cache block if a miss</td>
</tr>
<tr>
<td>Misses</td>
<td><% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %></td>
<td>Block requests that were cache misses but set to cache missed blocks</td>
</tr>
<tr>
<td>Misses Caching</td>
<td><% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %></td>
<td>Block requests that were cache misses but only requests set to use block cache</td>
</tr>
<tr>
<td>Hit Ratio</td>
<td><% String.format("%,.2f", cacheConfig.getBlockCache().getStats().getHitRatio() * 100) %><% "%" %></td>
<td>Hit Count divided by total requests count</td>
</tr>
<& hits_tmpl; bc = cacheConfig.getBlockCache(); &>
</table>
<p>If block cache is made up of more than one cache -- i.e. a L1 and a L2 -- then the above
are combined counts. Request count is sum of hits and misses.</p>
@ -349,7 +357,9 @@ are combined counts. Request count is sum of hits and misses.</p>
<td>Size of DATA Blocks</td>
</tr>
</%if>
<%if evictions %><& evictions_tmpl; bc = bc; &></%if>
<& evictions_tmpl; bc = bc; &>
<& hits_tmpl; bc = bc; &>
<%if bucketCache %>
<tr>
<td>Hits per Second</td>

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@ -126,8 +127,25 @@ public class CacheConfig {
*/
public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize";
// Defaults
private static final String EXTERNAL_BLOCKCACHE_KEY = "hbase.blockcache.use.external";
private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
/**
* Enum of all built in external block caches.
* This is used for config.
*/
private static enum ExternalBlockCaches {
memcached(MemcachedBlockCache.class);
// TODO(eclark): Consider more. Redis, etc.
Class<? extends BlockCache> clazz;
ExternalBlockCaches(Class<? extends BlockCache> clazz) {
this.clazz = clazz;
}
}
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
public static final boolean DEFAULT_IN_MEMORY = false;
@ -478,7 +496,44 @@ public class CacheConfig {
* @return Returns L2 block cache instance (for now it is BucketCache BlockCache all the time)
* or null if not supposed to be a L2.
*/
private static BucketCache getL2(final Configuration c, final MemoryUsage mu) {
private static BlockCache getL2(final Configuration c, final MemoryUsage mu) {
final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " l2 cache");
}
// If we want to use an external block cache then create that.
if (useExternal) {
return getExternalBlockcache(c);
}
// otherwise use the bucket cache.
return getBucketCache(c, mu);
}
private static BlockCache getExternalBlockcache(Configuration c) {
Class klass = null;
// Get the class, from the config. s
try {
klass = ExternalBlockCaches.valueOf(c.get(EXTERNAL_BLOCKCACHE_CLASS_KEY, "memcache")).clazz;
} catch (IllegalArgumentException exception) {
klass = c.getClass(EXTERNAL_BLOCKCACHE_CLASS_KEY, MemcachedBlockCache.class);
}
// Now try and create an instance of the block cache.
try {
LOG.info("Creating external block cache of type: " + klass);
return (BlockCache) ReflectionUtils.newInstance(klass, c);
} catch (Exception e) {
LOG.warn("Error creating external block cache", e);
}
return null;
}
private static BlockCache getBucketCache(Configuration c, MemoryUsage mu) {
// Check for L2. ioengine name must be non-null.
String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null;
@ -533,22 +588,27 @@ public class CacheConfig {
LruBlockCache l1 = getL1(conf, mu);
// blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call.
if (blockCacheDisabled) return null;
BucketCache l2 = getL2(conf, mu);
BlockCache l2 = getL2(conf, mu);
if (l2 == null) {
GLOBAL_BLOCK_CACHE_INSTANCE = l1;
} else {
boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
DEFAULT_BUCKET_CACHE_COMBINED);
if (combinedWithLru) {
GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
if (useExternal) {
GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
} else {
// L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler
// mechanism. It is a little ugly but works according to the following: when the
// background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get
// a block from the L1 cache, if not in L1, we will search L2.
l1.setVictimCache(l2);
GLOBAL_BLOCK_CACHE_INSTANCE = l1;
if (combinedWithLru) {
GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
} else {
// L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler
// mechanism. It is a little ugly but works according to the following: when the
// background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get
// a block from the L1 cache, if not in L1, we will search L2.
GLOBAL_BLOCK_CACHE_INSTANCE = l1;
}
}
l1.setVictimCache(l2);
}
return GLOBAL_BLOCK_CACHE_INSTANCE;
}

View File

@ -25,32 +25,37 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
/**
* CombinedBlockCache is an abstraction layer that combines
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
* to cache bloom blocks and index blocks. The larger bucketCache is used to
* to cache bloom blocks and index blocks. The larger l2Cache is used to
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads
* first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted
* first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted
* from lruCache are put into the bucket cache.
* Metrics are the combined size and hits and misses of both caches.
*
*/
@InterfaceAudience.Private
public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
private final LruBlockCache lruCache;
private final BucketCache bucketCache;
private final CombinedCacheStats combinedCacheStats;
protected final LruBlockCache lruCache;
protected final BlockCache l2Cache;
protected final CombinedCacheStats combinedCacheStats;
public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) {
public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) {
this.lruCache = lruCache;
this.bucketCache = bucketCache;
this.l2Cache = l2Cache;
this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(),
bucketCache.getStats());
l2Cache.getStats());
}
@Override
public long heapSize() {
return lruCache.heapSize() + bucketCache.heapSize();
long l2size = 0;
if (l2Cache instanceof HeapSize) {
l2size = ((HeapSize) l2Cache).heapSize();
}
return lruCache.heapSize() + l2size;
}
@Override
@ -60,7 +65,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
if (isMetaBlock || cacheDataInL1) {
lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
} else {
bucketCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
l2Cache.cacheBlock(cacheKey, buf, inMemory, false);
}
}
@ -73,22 +78,24 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat, boolean updateCacheMetrics) {
// TODO: is there a hole here, or just awkwardness since in the lruCache getBlock
// we end up calling bucketCache.getBlock.
// we end up calling l2Cache.getBlock.
if (lruCache.containsBlock(cacheKey)) {
return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
Cacheable result = l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
return result;
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey);
return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
}
@Override
public int evictBlocksByHfileName(String hfileName) {
return lruCache.evictBlocksByHfileName(hfileName)
+ bucketCache.evictBlocksByHfileName(hfileName);
+ l2Cache.evictBlocksByHfileName(hfileName);
}
@Override
@ -99,27 +106,27 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override
public void shutdown() {
lruCache.shutdown();
bucketCache.shutdown();
l2Cache.shutdown();
}
@Override
public long size() {
return lruCache.size() + bucketCache.size();
return lruCache.size() + l2Cache.size();
}
@Override
public long getFreeSize() {
return lruCache.getFreeSize() + bucketCache.getFreeSize();
return lruCache.getFreeSize() + l2Cache.getFreeSize();
}
@Override
public long getCurrentSize() {
return lruCache.getCurrentSize() + bucketCache.getCurrentSize();
return lruCache.getCurrentSize() + l2Cache.getCurrentSize();
}
@Override
public long getBlockCount() {
return lruCache.getBlockCount() + bucketCache.getBlockCount();
return lruCache.getBlockCount() + l2Cache.getBlockCount();
}
private static class CombinedCacheStats extends CacheStats {
@ -205,7 +212,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override
public BlockCache[] getBlockCaches() {
return new BlockCache [] {this.lruCache, this.bucketCache};
return new BlockCache [] {this.lruCache, this.l2Cache};
}
@Override

View File

@ -116,7 +116,7 @@ public class HFileBlock implements Cacheable {
*/
static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
private static final CacheableDeserializer<Cacheable> blockDeserializer =
static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
@ -130,13 +130,13 @@ public class HFileBlock implements Cacheable {
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
boolean usesChecksum = buf.get() == (byte)1;
HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
if (ourBuffer.hasNextBlockHeader()) {
ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize());
HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
hFileBlock.offset = buf.getLong();
hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
if (hFileBlock.hasNextBlockHeader()) {
hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize());
}
return ourBuffer;
return hFileBlock;
}
@Override
@ -670,7 +670,7 @@ public class HFileBlock implements Cacheable {
* @return true if succeeded reading the extra bytes
* @throws IOException if failed to read the necessary bytes
*/
public static boolean readWithExtra(InputStream in, byte buf[],
public static boolean readWithExtra(InputStream in, byte[] buf,
int bufOffset, int necessaryLen, int extraLen) throws IOException {
int bytesRemaining = necessaryLen + extraLen;
while (bytesRemaining > 0) {
@ -776,7 +776,8 @@ public class HFileBlock implements Cacheable {
/**
* Valid in the READY state. Contains the header and the uncompressed (but
* potentially encoded, if this is a data block) bytes, so the length is
* {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
* {@link #uncompressedSizeWithoutHeader} +
* {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
* Does not store checksums.
*/
private byte[] uncompressedBytesWithHeader;
@ -1059,7 +1060,9 @@ public class HFileBlock implements Cacheable {
*/
int getOnDiskSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return onDiskBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
return onDiskBytesWithHeader.length
+ onDiskChecksum.length
- HConstants.HFILEBLOCK_HEADER_SIZE;
}
/**
@ -1832,7 +1835,8 @@ public class HFileBlock implements Cacheable {
if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) {
return 0;
}
return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, this.fileContext.getBytesPerChecksum());
return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
this.fileContext.getBytesPerChecksum());
}
/**
@ -1886,8 +1890,8 @@ public class HFileBlock implements Cacheable {
byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
buf.get(magicBuf);
BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
int compressedBlockSizeNoHeader = buf.getInt();;
int uncompressedBlockSizeNoHeader = buf.getInt();;
int compressedBlockSizeNoHeader = buf.getInt();
int uncompressedBlockSizeNoHeader = buf.getInt();
long prevBlockOffset = buf.getLong();
byte cksumtype = buf.get();
long bytesPerChecksum = buf.getInt();

View File

@ -0,0 +1,58 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache {
public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) {
super(l1,l2);
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat, boolean updateCacheMetrics) {
// On all external cache set ups the lru should have the l2 cache set as the victimHandler
// Because of that all requests that miss inside of the lru block cache will be
// tried in the l2 block cache.
return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
/**
*
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory. This parameter is only useful for
* the L1 lru cache.
* @param cacheDataInL1 This is totally ignored.
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
final boolean cacheDataInL1) {
// This is the inclusive part of the combined block cache.
// Every block is placed into both block caches.
lruCache.cacheBlock(cacheKey, buf, inMemory, true);
// This assumes that insertion into the L2 block cache is either async or very fast.
l2Cache.cacheBlock(cacheKey, buf, inMemory, true);
}
}

View File

@ -197,8 +197,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
private boolean forceInMemory;
/** Where to send victims (blocks evicted/missing from the cache) */
// TODO: Fix it so this is not explicit reference to a particular BlockCache implementation.
private BucketCache victimHandler = null;
private BlockCache victimHandler = null;
/**
* Default constructor. Specify maximum size and expected average block
@ -419,8 +418,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
LruCachedBlock cb = map.get(cacheKey);
if (cb == null) {
if (!repeat && updateCacheMetrics) stats.miss(caching);
if (victimHandler != null) {
return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// If there is another block cache then try and read there.
// However if this is a retry ( second time in double checked locking )
// And it's already a miss then the l2 will also be a miss.
if (victimHandler != null && !repeat) {
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// Promote this to L1.
if (result != null && caching) {
cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
}
return result;
}
return null;
}
@ -489,10 +497,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
}
stats.evicted(block.getCachedTime());
if (evictedByEvictionProcess && victimHandler != null) {
boolean wait = getCurrentSize() < acceptableSize();
boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
inMemory, wait);
if (victimHandler instanceof BucketCache) {
boolean wait = getCurrentSize() < acceptableSize();
boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
inMemory, wait);
} else {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
}
}
return block.heapSize();
}
@ -787,7 +799,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
synchronized(this) {
try {
this.wait(1000 * 10/*Don't wait for ever*/);
} catch(InterruptedException e) {}
} catch(InterruptedException e) {
LOG.warn("Interrupted eviction thread ", e);
Thread.currentThread().interrupt();
}
}
LruBlockCache cache = this.cache.get();
if (cache == null) break;
@ -1057,7 +1072,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return counts;
}
public void setVictimCache(BucketCache handler) {
public void setVictimCache(BlockCache handler) {
assert victimHandler == null;
victimHandler = handler;
}
@ -1067,7 +1082,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return map;
}
BucketCache getVictimHandler() {
BlockCache getVictimHandler() {
return this.victimHandler;
}

View File

@ -0,0 +1,272 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
/**
* Class to store blocks into memcached.
* This should only be used on a cluster of Memcached daemons that are tuned well and have a
* good network connection to the HBase regionservers. Any other use will likely slow down HBase
* greatly.
*/
@InterfaceAudience.Private
public class MemcachedBlockCache implements BlockCache {
private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName());
// Some memcache versions won't take more than 1024 * 1024. So set the limit below
// that just in case this client is used with those versions.
public static final int MAX_SIZE = 1020 * 1024;
// Config key for what memcached servers to use.
// They should be specified in a comma sperated list with ports.
// like:
//
// host1:11211,host3:8080,host4:11211
public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
private final MemcachedClient client;
private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
public MemcachedBlockCache(Configuration c) throws IOException {
LOG.info("Creating MemcachedBlockCache");
long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
.setOpTimeout(opTimeout)
.setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out
.setFailureMode(FailureMode.Redistribute)
.setShouldOptimize(true) // When regions move lots of reads happen together
// So combining them into single requests is nice.
.setDaemon(true) // Don't keep threads around past the end of days.
.setUseNagleAlgorithm(false) // Ain't nobody got time for that
.setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // 4 times larger than the
// default block just in case
// Assume only the localhost is serving memecached.
// A la mcrouter or co-locating memcached with split regionservers.
//
// If this config is a pool of memecached servers they will all be used according to the
// default hashing scheme defined by the memcache client. Spy Memecache client in this
// case.
String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
String[] servers = serverListString.split(",");
List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
for (String s:servers) {
serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
}
client = new MemcachedClient(builder.build(), serverAddresses);
}
@Override
public void cacheBlock(BlockCacheKey cacheKey,
Cacheable buf,
boolean inMemory,
boolean cacheDataInL1) {
cacheBlock(cacheKey, buf);
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) {
client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
+ buf.getClass().toString());
}
}
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
boolean repeat, boolean updateCacheMetrics) {
// Assume that nothing is the block cache
HFileBlock result = null;
try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
result = client.get(cacheKey.toString(), tc);
} catch (Exception e) {
// Catch a pretty broad set of exceptions to limit any changes in the memecache client
// and how it handles failures from leaking into the read path.
if (LOG.isDebugEnabled()) {
LOG.debug("Exception pulling from memcached [ "
+ cacheKey.toString()
+ " ]. Treating as a miss.", e);
}
result = null;
} finally {
// Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) {
if (result == null) {
cacheStats.miss(caching);
} else {
cacheStats.hit(caching);
}
}
}
return result;
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
try {
cacheStats.evict();
return client.delete(cacheKey.toString()).get();
} catch (InterruptedException e) {
LOG.warn("Error deleting " + cacheKey.toString(), e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error deleting " + cacheKey.toString(), e);
}
}
return false;
}
/**
* This method does nothing so that memcached can handle all evictions.
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
return 0;
}
@Override
public CacheStats getStats() {
return cacheStats;
}
@Override
public void shutdown() {
client.shutdown();
}
@Override
public long size() {
return 0;
}
@Override
public long getFreeSize() {
return 0;
}
@Override
public long getCurrentSize() {
return 0;
}
@Override
public long getBlockCount() {
return 0;
}
@Override
public Iterator<CachedBlock> iterator() {
return new Iterator<CachedBlock>() {
@Override
public boolean hasNext() {
return false;
}
@Override
public CachedBlock next() {
throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
}
@Override
public void remove() {
}
};
}
@Override
public BlockCache[] getBlockCaches() {
return null;
}
/**
* Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
*/
private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
@Override
public boolean asyncDecode(CachedData d) {
return false;
}
@Override
public CachedData encode(HFileBlock block) {
ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
block.serialize(bb);
return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
}
@Override
public HFileBlock decode(CachedData d) {
try {
ByteBuffer buf = ByteBuffer.wrap(d.getData());
return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
} catch (IOException e) {
LOG.warn("Error deserializing data from memcached",e);
}
return null;
}
@Override
public int getMaxSize() {
return MAX_SIZE;
}
}
}

View File

@ -283,9 +283,9 @@ public class TestCacheConfig {
// TODO: Assert sizes allocated are right and proportions.
LruBlockCache lbc = (LruBlockCache)cc.getBlockCache();
assertEquals(lruExpectedSize, lbc.getMaxSize());
BucketCache bc = lbc.getVictimHandler();
BlockCache bc = lbc.getVictimHandler();
// getMaxSize comes back in bytes but we specified size in MB
assertEquals(bcExpectedSize, bc.getMaxSize());
assertEquals(bcExpectedSize, ((BucketCache) bc).getMaxSize());
// Test the L1+L2 deploy works as we'd expect with blocks evicted from L1 going to L2.
long initialL1BlockCount = lbc.getBlockCount();
long initialL2BlockCount = bc.getBlockCount();

View File

@ -1174,6 +1174,7 @@
<netty.version>4.0.23.Final</netty.version>
<joni.version>2.1.2</joni.version>
<jcodings.version>1.0.8</jcodings.version>
<spy.version>2.11.6</spy.version>
<!-- Plugin Dependencies -->
<maven.assembly.version>2.4</maven.assembly.version>
<maven.antrun.version>1.6</maven.antrun.version>
@ -1665,6 +1666,12 @@
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>${spy.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>