diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 3ccc73884dc..9f8a17ac849 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -471,6 +471,11 @@ io.netty netty-all + + net.spy + spymemcached + true + org.apache.htrace diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index e4ff70fd1ee..04191968740 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -206,6 +206,38 @@ org.apache.hadoop.util.StringUtils; +<%def hits_tmpl> +<%args> + BlockCache bc; + + + Hits + <% String.format("%,d", bc.getStats().getHitCount()) %> + Number requests that were cache hits + + + Hits Caching + <% String.format("%,d", bc.getStats().getHitCachingCount()) %> + Cache hit block requests but only requests set to cache block if a miss + + + Misses + <% String.format("%,d", bc.getStats().getMissCount()) %> + Block requests that were cache misses but set to cache missed blocks + + + Misses Caching + <% String.format("%,d", bc.getStats().getMissCount()) %> + Block requests that were cache misses but only requests set to use block cache + + + Hit Ratio + <% String.format("%,.2f", bc.getStats().getHitRatio() * 100) %><% "%" %> + Hit Count divided by total requests count + + + + <%def bc_stats> <%args> CacheConfig cacheConfig; @@ -235,31 +267,7 @@ org.apache.hadoop.util.StringUtils; Number of blocks in block cache <& evictions_tmpl; bc = cacheConfig.getBlockCache(); &> - - Hits - <% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCount()) %> - Number requests that were cache hits - - - Hits Caching - <% String.format("%,d", cacheConfig.getBlockCache().getStats().getHitCachingCount()) %> - Cache hit block requests but only requests set to cache block if a miss - - - Misses - <% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %> - Block requests that were cache misses but set to cache missed blocks - - - Misses Caching - <% String.format("%,d", cacheConfig.getBlockCache().getStats().getMissCount()) %> - Block requests that were cache misses but only requests set to use block cache - - - Hit Ratio - <% String.format("%,.2f", cacheConfig.getBlockCache().getStats().getHitRatio() * 100) %><% "%" %> - Hit Count divided by total requests count - + <& hits_tmpl; bc = cacheConfig.getBlockCache(); &>

If block cache is made up of more than one cache -- i.e. a L1 and a L2 -- then the above are combined counts. Request count is sum of hits and misses.

@@ -349,7 +357,9 @@ are combined counts. Request count is sum of hits and misses.

Size of DATA Blocks -<%if evictions %><& evictions_tmpl; bc = bc; &> +<& evictions_tmpl; bc = bc; &> +<& hits_tmpl; bc = bc; &> + <%if bucketCache %> Hits per Second diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index f212f14f45c..5d221c8efb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -126,8 +127,25 @@ public class CacheConfig { */ public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize"; - // Defaults + private static final String EXTERNAL_BLOCKCACHE_KEY = "hbase.blockcache.use.external"; + private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false; + private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class"; + + /** + * Enum of all built in external block caches. + * This is used for config. + */ + private static enum ExternalBlockCaches { + memcached(MemcachedBlockCache.class); + // TODO(eclark): Consider more. Redis, etc. + Class clazz; + ExternalBlockCaches(Class clazz) { + this.clazz = clazz; + } + } + + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false; public static final boolean DEFAULT_IN_MEMORY = false; @@ -478,7 +496,44 @@ public class CacheConfig { * @return Returns L2 block cache instance (for now it is BucketCache BlockCache all the time) * or null if not supposed to be a L2. */ - private static BucketCache getL2(final Configuration c, final MemoryUsage mu) { + private static BlockCache getL2(final Configuration c, final MemoryUsage mu) { + final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " l2 cache"); + } + + // If we want to use an external block cache then create that. + if (useExternal) { + return getExternalBlockcache(c); + } + + // otherwise use the bucket cache. + return getBucketCache(c, mu); + + } + + private static BlockCache getExternalBlockcache(Configuration c) { + Class klass = null; + + // Get the class, from the config. s + try { + klass = ExternalBlockCaches.valueOf(c.get(EXTERNAL_BLOCKCACHE_CLASS_KEY, "memcache")).clazz; + } catch (IllegalArgumentException exception) { + klass = c.getClass(EXTERNAL_BLOCKCACHE_CLASS_KEY, MemcachedBlockCache.class); + } + + // Now try and create an instance of the block cache. + try { + LOG.info("Creating external block cache of type: " + klass); + return (BlockCache) ReflectionUtils.newInstance(klass, c); + } catch (Exception e) { + LOG.warn("Error creating external block cache", e); + } + return null; + + } + + private static BlockCache getBucketCache(Configuration c, MemoryUsage mu) { // Check for L2. ioengine name must be non-null. String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; @@ -533,22 +588,27 @@ public class CacheConfig { LruBlockCache l1 = getL1(conf, mu); // blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. if (blockCacheDisabled) return null; - BucketCache l2 = getL2(conf, mu); + BlockCache l2 = getL2(conf, mu); if (l2 == null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1; } else { + boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED); - if (combinedWithLru) { - GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); + if (useExternal) { + GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); } else { - // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler - // mechanism. It is a little ugly but works according to the following: when the - // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get - // a block from the L1 cache, if not in L1, we will search L2. - l1.setVictimCache(l2); - GLOBAL_BLOCK_CACHE_INSTANCE = l1; + if (combinedWithLru) { + GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); + } else { + // L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler + // mechanism. It is a little ugly but works according to the following: when the + // background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get + // a block from the L1 cache, if not in L1, we will search L2. + GLOBAL_BLOCK_CACHE_INSTANCE = l1; + } } + l1.setVictimCache(l2); } return GLOBAL_BLOCK_CACHE_INSTANCE; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 52a5793a451..7725cf9290b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -25,32 +25,37 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; + /** * CombinedBlockCache is an abstraction layer that combines * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used - * to cache bloom blocks and index blocks. The larger bucketCache is used to + * to cache bloom blocks and index blocks. The larger l2Cache is used to * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads - * first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted + * first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted * from lruCache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. * */ @InterfaceAudience.Private public class CombinedBlockCache implements ResizableBlockCache, HeapSize { - private final LruBlockCache lruCache; - private final BucketCache bucketCache; - private final CombinedCacheStats combinedCacheStats; + protected final LruBlockCache lruCache; + protected final BlockCache l2Cache; + protected final CombinedCacheStats combinedCacheStats; - public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) { + public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) { this.lruCache = lruCache; - this.bucketCache = bucketCache; + this.l2Cache = l2Cache; this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), - bucketCache.getStats()); + l2Cache.getStats()); } @Override public long heapSize() { - return lruCache.heapSize() + bucketCache.heapSize(); + long l2size = 0; + if (l2Cache instanceof HeapSize) { + l2size = ((HeapSize) l2Cache).heapSize(); + } + return lruCache.heapSize() + l2size; } @Override @@ -60,7 +65,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { if (isMetaBlock || cacheDataInL1) { lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); } else { - bucketCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); + l2Cache.cacheBlock(cacheKey, buf, inMemory, false); } } @@ -73,22 +78,24 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock - // we end up calling bucketCache.getBlock. + // we end up calling l2Cache.getBlock. if (lruCache.containsBlock(cacheKey)) { return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } - return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + Cacheable result = l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + + return result; } @Override public boolean evictBlock(BlockCacheKey cacheKey) { - return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey); + return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); } @Override public int evictBlocksByHfileName(String hfileName) { return lruCache.evictBlocksByHfileName(hfileName) - + bucketCache.evictBlocksByHfileName(hfileName); + + l2Cache.evictBlocksByHfileName(hfileName); } @Override @@ -99,27 +106,27 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public void shutdown() { lruCache.shutdown(); - bucketCache.shutdown(); + l2Cache.shutdown(); } @Override public long size() { - return lruCache.size() + bucketCache.size(); + return lruCache.size() + l2Cache.size(); } @Override public long getFreeSize() { - return lruCache.getFreeSize() + bucketCache.getFreeSize(); + return lruCache.getFreeSize() + l2Cache.getFreeSize(); } @Override public long getCurrentSize() { - return lruCache.getCurrentSize() + bucketCache.getCurrentSize(); + return lruCache.getCurrentSize() + l2Cache.getCurrentSize(); } @Override public long getBlockCount() { - return lruCache.getBlockCount() + bucketCache.getBlockCount(); + return lruCache.getBlockCount() + l2Cache.getBlockCount(); } private static class CombinedCacheStats extends CacheStats { @@ -205,7 +212,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public BlockCache[] getBlockCaches() { - return new BlockCache [] {this.lruCache, this.bucketCache}; + return new BlockCache [] {this.lruCache, this.l2Cache}; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 3d4c104e5d1..b8303b860a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -116,7 +116,7 @@ public class HFileBlock implements Cacheable { */ static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; - private static final CacheableDeserializer blockDeserializer = + static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); @@ -130,13 +130,13 @@ public class HFileBlock implements Cacheable { buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); - ourBuffer.offset = buf.getLong(); - ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); - if (ourBuffer.hasNextBlockHeader()) { - ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize()); + HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum); + hFileBlock.offset = buf.getLong(); + hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt(); + if (hFileBlock.hasNextBlockHeader()) { + hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize()); } - return ourBuffer; + return hFileBlock; } @Override @@ -671,7 +671,7 @@ public class HFileBlock implements Cacheable { * @return true if succeeded reading the extra bytes * @throws IOException if failed to read the necessary bytes */ - public static boolean readWithExtra(InputStream in, byte buf[], + public static boolean readWithExtra(InputStream in, byte[] buf, int bufOffset, int necessaryLen, int extraLen) throws IOException { int bytesRemaining = necessaryLen + extraLen; while (bytesRemaining > 0) { @@ -777,7 +777,8 @@ public class HFileBlock implements Cacheable { /** * Valid in the READY state. Contains the header and the uncompressed (but * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. + * {@link #uncompressedSizeWithoutHeader} + + * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. * Does not store checksums. */ private byte[] uncompressedBytesWithHeader; @@ -1060,7 +1061,9 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return onDiskBytesWithHeader.length + + onDiskChecksum.length + - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1820,7 +1823,8 @@ public class HFileBlock implements Cacheable { if (!fileContext.isUseHBaseChecksum() || this.fileContext.getBytesPerChecksum() == 0) { return 0; } - return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, this.fileContext.getBytesPerChecksum()); + return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader, + this.fileContext.getBytesPerChecksum()); } /** @@ -1874,8 +1878,8 @@ public class HFileBlock implements Cacheable { byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; buf.get(magicBuf); BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); - int compressedBlockSizeNoHeader = buf.getInt();; - int uncompressedBlockSizeNoHeader = buf.getInt();; + int compressedBlockSizeNoHeader = buf.getInt(); + int uncompressedBlockSizeNoHeader = buf.getInt(); long prevBlockOffset = buf.getLong(); byte cksumtype = buf.get(); long bytesPerChecksum = buf.getInt(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java new file mode 100644 index 00000000000..667e7b4c14b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java @@ -0,0 +1,58 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache { + public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) { + super(l1,l2); + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat, boolean updateCacheMetrics) { + // On all external cache set ups the lru should have the l2 cache set as the victimHandler + // Because of that all requests that miss inside of the lru block cache will be + // tried in the l2 block cache. + return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + } + + /** + * + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory. This parameter is only useful for + * the L1 lru cache. + * @param cacheDataInL1 This is totally ignored. + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + final boolean cacheDataInL1) { + // This is the inclusive part of the combined block cache. + // Every block is placed into both block caches. + lruCache.cacheBlock(cacheKey, buf, inMemory, true); + + // This assumes that insertion into the L2 block cache is either async or very fast. + l2Cache.cacheBlock(cacheKey, buf, inMemory, true); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index d7da5f394a8..48e4cadd1ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -199,8 +199,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { private boolean forceInMemory; /** Where to send victims (blocks evicted/missing from the cache) */ - // TODO: Fix it so this is not explicit reference to a particular BlockCache implementation. - private BucketCache victimHandler = null; + private BlockCache victimHandler = null; /** * Default constructor. Specify maximum size and expected average block @@ -421,8 +420,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { LruCachedBlock cb = map.get(cacheKey); if (cb == null) { if (!repeat && updateCacheMetrics) stats.miss(caching); - if (victimHandler != null) { - return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + // If there is another block cache then try and read there. + // However if this is a retry ( second time in double checked locking ) + // And it's already a miss then the l2 will also be a miss. + if (victimHandler != null && !repeat) { + Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + + // Promote this to L1. + if (result != null && caching) { + cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true); + } + return result; } return null; } @@ -491,10 +499,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } stats.evicted(block.getCachedTime()); if (evictedByEvictionProcess && victimHandler != null) { - boolean wait = getCurrentSize() < acceptableSize(); - boolean inMemory = block.getPriority() == BlockPriority.MEMORY; - victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), - inMemory, wait); + if (victimHandler instanceof BucketCache) { + boolean wait = getCurrentSize() < acceptableSize(); + boolean inMemory = block.getPriority() == BlockPriority.MEMORY; + ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(), + inMemory, wait); + } else { + victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); + } } return block.heapSize(); } @@ -789,7 +801,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { synchronized(this) { try { this.wait(1000 * 10/*Don't wait for ever*/); - } catch(InterruptedException e) {} + } catch(InterruptedException e) { + LOG.warn("Interrupted eviction thread ", e); + Thread.currentThread().interrupt(); + } } LruBlockCache cache = this.cache.get(); if (cache == null) break; @@ -1059,7 +1074,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } - public void setVictimCache(BucketCache handler) { + public void setVictimCache(BlockCache handler) { assert victimHandler == null; victimHandler = handler; } @@ -1069,7 +1084,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return map; } - BucketCache getVictimHandler() { + BlockCache getVictimHandler() { return this.victimHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java new file mode 100644 index 00000000000..57e7f2827ae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -0,0 +1,272 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import net.spy.memcached.CachedData; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.transcoders.Transcoder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; + +/** + * Class to store blocks into memcached. + * This should only be used on a cluster of Memcached daemons that are tuned well and have a + * good network connection to the HBase regionservers. Any other use will likely slow down HBase + * greatly. + */ +@InterfaceAudience.Private +public class MemcachedBlockCache implements BlockCache { + private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName()); + + // Some memcache versions won't take more than 1024 * 1024. So set the limit below + // that just in case this client is used with those versions. + public static final int MAX_SIZE = 1020 * 1024; + + // Config key for what memcached servers to use. + // They should be specified in a comma sperated list with ports. + // like: + // + // host1:11211,host3:8080,host4:11211 + public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; + public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout"; + public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout"; + public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; + + private final MemcachedClient client; + private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); + private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); + + public MemcachedBlockCache(Configuration c) throws IOException { + LOG.info("Creating MemcachedBlockCache"); + + long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); + long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); + + ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder() + .setOpTimeout(opTimeout) + .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out + .setFailureMode(FailureMode.Redistribute) + .setShouldOptimize(true) // When regions move lots of reads happen together + // So combining them into single requests is nice. + .setDaemon(true) // Don't keep threads around past the end of days. + .setUseNagleAlgorithm(false) // Ain't nobody got time for that + .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // 4 times larger than the + // default block just in case + + + // Assume only the localhost is serving memecached. + // A la mcrouter or co-locating memcached with split regionservers. + // + // If this config is a pool of memecached servers they will all be used according to the + // default hashing scheme defined by the memcache client. Spy Memecache client in this + // case. + String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); + String[] servers = serverListString.split(","); + List serverAddresses = new ArrayList(servers.length); + for (String s:servers) { + serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); + } + + client = new MemcachedClient(builder.build(), serverAddresses); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, + Cacheable buf, + boolean inMemory, + boolean cacheDataInL1) { + cacheBlock(cacheKey, buf); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + if (buf instanceof HFileBlock) { + client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("MemcachedBlockCache can not cache Cacheable's of type " + + buf.getClass().toString()); + } + } + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, + boolean repeat, boolean updateCacheMetrics) { + // Assume that nothing is the block cache + HFileBlock result = null; + + try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) { + result = client.get(cacheKey.toString(), tc); + } catch (Exception e) { + // Catch a pretty broad set of exceptions to limit any changes in the memecache client + // and how it handles failures from leaking into the read path. + if (LOG.isDebugEnabled()) { + LOG.debug("Exception pulling from memcached [ " + + cacheKey.toString() + + " ]. Treating as a miss.", e); + } + result = null; + } finally { + // Update stats if this request doesn't have it turned off 100% of the time + if (updateCacheMetrics) { + if (result == null) { + cacheStats.miss(caching); + } else { + cacheStats.hit(caching); + } + } + } + + + return result; + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + try { + cacheStats.evict(); + return client.delete(cacheKey.toString()).get(); + } catch (InterruptedException e) { + LOG.warn("Error deleting " + cacheKey.toString(), e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error deleting " + cacheKey.toString(), e); + } + } + return false; + } + + /** + * This method does nothing so that memcached can handle all evictions. + */ + @Override + public int evictBlocksByHfileName(String hfileName) { + return 0; + } + + @Override + public CacheStats getStats() { + return cacheStats; + } + + @Override + public void shutdown() { + client.shutdown(); + } + + @Override + public long size() { + return 0; + } + + @Override + public long getFreeSize() { + return 0; + } + + @Override + public long getCurrentSize() { + return 0; + } + + @Override + public long getBlockCount() { + return 0; + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public CachedBlock next() { + throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks."); + } + + @Override + public void remove() { + + } + }; + } + + @Override + public BlockCache[] getBlockCaches() { + return null; + } + + /** + * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. + */ + private static class HFileBlockTranscoder implements Transcoder { + + @Override + public boolean asyncDecode(CachedData d) { + return false; + } + + @Override + public CachedData encode(HFileBlock block) { + ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); + block.serialize(bb); + return new CachedData(0, bb.array(), CachedData.MAX_SIZE); + } + + @Override + public HFileBlock decode(CachedData d) { + try { + ByteBuffer buf = ByteBuffer.wrap(d.getData()); + return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true); + } catch (IOException e) { + LOG.warn("Error deserializing data from memcached",e); + } + return null; + } + + @Override + public int getMaxSize() { + return MAX_SIZE; + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index 4d547c72481..4671f3ad56d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -282,9 +282,9 @@ public class TestCacheConfig { // TODO: Assert sizes allocated are right and proportions. LruBlockCache lbc = (LruBlockCache)cc.getBlockCache(); assertEquals(lruExpectedSize, lbc.getMaxSize()); - BucketCache bc = lbc.getVictimHandler(); + BlockCache bc = lbc.getVictimHandler(); // getMaxSize comes back in bytes but we specified size in MB - assertEquals(bcExpectedSize, bc.getMaxSize()); + assertEquals(bcExpectedSize, ((BucketCache) bc).getMaxSize()); // Test the L1+L2 deploy works as we'd expect with blocks evicted from L1 going to L2. long initialL1BlockCount = lbc.getBlockCount(); long initialL2BlockCount = bc.getBlockCount(); diff --git a/pom.xml b/pom.xml index 38cab6ac312..6905bfceccf 100644 --- a/pom.xml +++ b/pom.xml @@ -1100,6 +1100,7 @@ 4.0.23.Final 2.1.2 1.0.8 + 2.11.6 2.4 1.6 @@ -1603,6 +1604,12 @@ disruptor ${disruptor.version}
+ + net.spy + spymemcached + ${spy.version} + true + org.jmock jmock-junit4