diff --git a/hbase-external-blockcache/pom.xml b/hbase-external-blockcache/pom.xml
index 79af809f6f1..13f463d9544 100644
--- a/hbase-external-blockcache/pom.xml
+++ b/hbase-external-blockcache/pom.xml
@@ -51,7 +51,63 @@
org.slf4j
- slf4j-api
+ jcl-over-slf4j
+ test
+
+
+ org.slf4j
+ jul-to-slf4j
+ test
+
+
+ org.apache.logging.log4j
+ log4j-api
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ test
+
+
+ org.apache.hbase
+ hbase-logging
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-common
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-zookeeper
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-server
+ test-jar
+ test
+
+
+ com.thimbleware.jmemcached
+ jmemcached-core
+ 1.0.0
+ test
junit
diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
index d5c6bff8cd0..2e8811e93a7 100644
--- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
+++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -27,28 +27,36 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
* that are tuned well and have a good network connection to the HBase regionservers. Any other use
* will likely slow down HBase greatly.
*/
@InterfaceAudience.Private
+@SuppressWarnings("FutureReturnValueIgnored")
public class MemcachedBlockCache implements BlockCache {
private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
@@ -56,6 +64,10 @@ public class MemcachedBlockCache implements BlockCache {
// that just in case this client is used with those versions.
public static final int MAX_SIZE = 1020 * 1024;
+ // Start memcached with -I to ensure it has the ability to store blocks of this size
+ public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
+ // spec
+
// Config key for what memcached servers to use.
// They should be specified in a comma sperated list with ports.
// like:
@@ -67,10 +79,20 @@ public class MemcachedBlockCache implements BlockCache {
public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
+ public static final int STAT_THREAD_PERIOD = 60 * 5;
private final MemcachedClient client;
private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
+ private final AtomicLong cachedCount = new AtomicLong();
+ private final AtomicLong notCachedCount = new AtomicLong();
+ private final AtomicLong cacheErrorCount = new AtomicLong();
+ private final AtomicLong timeoutCount = new AtomicLong();
+
+ /** Statistics thread schedule pool (for heavy debugging, could remove) */
+ private transient final ScheduledExecutorService scheduleThreadPool =
+ Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());
public MemcachedBlockCache(Configuration c) throws IOException {
LOG.info("Creating MemcachedBlockCache");
@@ -80,18 +102,15 @@ public class MemcachedBlockCache implements BlockCache {
boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
ConnectionFactoryBuilder builder =
- // Cap the max time before anything times out
new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
- // Don't keep threads around past the end of days.
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
- .setUseNagleAlgorithm(false) // Ain't nobody got time for that
- .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
+ .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);
- // Assume only the localhost is serving memecached.
+ // Assume only the localhost is serving memcached.
// A la mcrouter or co-locating memcached with split regionservers.
//
- // If this config is a pool of memecached servers they will all be used according to the
- // default hashing scheme defined by the memcache client. Spy Memecache client in this
+ // If this config is a pool of memcached servers they will all be used according to the
+ // default hashing scheme defined by the memcached client. Spy Memecache client in this
// case.
String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
String[] servers = serverListString.split(",");
@@ -104,6 +123,8 @@ public class MemcachedBlockCache implements BlockCache {
}
client = new MemcachedClient(builder.build(), serverAddresses);
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
+ STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}
@Override
@@ -111,22 +132,27 @@ public class MemcachedBlockCache implements BlockCache {
cacheBlock(cacheKey, buf);
}
- @SuppressWarnings("FutureReturnValueIgnored")
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) {
- client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> {
+ if (buf.getSerializedLength() > MAX_SIZE) {
+ LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
+ buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
+ notCachedCount.incrementAndGet();
+ return;
+ }
+ client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
try {
f.get();
- } catch (ExecutionException e) {
- LOG.warn("Failed to cache block", e);
+ cachedCount.incrementAndGet();
+ } catch (Exception e) {
+ LOG.warn("Failed to cache block with key " + cacheKey, e);
+ cacheErrorCount.incrementAndGet();
}
});
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString());
- }
+ LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
+ notCachedCount.incrementAndGet();
}
}
@@ -139,17 +165,25 @@ public class MemcachedBlockCache implements BlockCache {
try (Scope traceScope = span.makeCurrent()) {
result = client.get(cacheKey.toString(), tc);
} catch (Exception e) {
- // Catch a pretty broad set of exceptions to limit any changes in the memecache client
+ // Catch a pretty broad set of exceptions to limit any changes in the memcache client
// and how it handles failures from leaking into the read path.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.",
- e);
+ if (
+ (e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
+ && (e.getCause() instanceof OperationTimeoutException))
+ ) {
+ timeoutCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Timeout getting key " + cacheKey.toString(), e);
+ }
+ } else {
+ cacheErrorCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception getting key " + cacheKey.toString(), e);
+ }
}
result = null;
} finally {
span.end();
- // Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) {
if (result == null) {
cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@@ -158,7 +192,6 @@ public class MemcachedBlockCache implements BlockCache {
}
}
}
-
return result;
}
@@ -194,6 +227,22 @@ public class MemcachedBlockCache implements BlockCache {
@Override
public void shutdown() {
client.shutdown();
+ this.scheduleThreadPool.shutdown();
+ for (int i = 0; i < 10; i++) {
+ if (!this.scheduleThreadPool.isShutdown()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while sleeping");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ if (!this.scheduleThreadPool.isShutdown()) {
+ List runnables = this.scheduleThreadPool.shutdownNow();
+ LOG.debug("Still running " + runnables);
+ }
}
@Override
@@ -289,4 +338,38 @@ public class MemcachedBlockCache implements BlockCache {
return MAX_SIZE;
}
}
+
+ private static class StatisticsThread extends Thread {
+
+ private final MemcachedBlockCache c;
+
+ public StatisticsThread(MemcachedBlockCache c) {
+ super("MemcachedBlockCacheStats");
+ setDaemon(true);
+ this.c = c;
+ }
+
+ @Override
+ public void run() {
+ c.logStats();
+ }
+
+ }
+
+ public void logStats() {
+ LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
+ + ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
+ + cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
+ + (cacheStats.getHitCount() == 0
+ ? "0"
+ : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
+ + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
+ + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
+ + (cacheStats.getHitCachingCount() == 0
+ ? "0,"
+ : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
+ + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
+ + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
+ }
+
}
diff --git a/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
new file mode 100644
index 00000000000..1e0cdf78c4d
--- /dev/null
+++ b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+
+import com.thimbleware.jmemcached.CacheElement;
+import com.thimbleware.jmemcached.CacheImpl;
+import com.thimbleware.jmemcached.Key;
+import com.thimbleware.jmemcached.LocalCacheElement;
+import com.thimbleware.jmemcached.MemCacheDaemon;
+import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap;
+import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy;
+import java.net.InetSocketAddress;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestMemcachedBlockCache {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMemcachedBlockCache.class);
+
+ static MemCacheDaemon extends CacheElement> MEMCACHED;
+ static MemcachedBlockCache CACHE;
+
+ @Before
+ public void before() throws Exception {
+ MEMCACHED.getCache().flush_all();
+ assertEquals("Memcache is not empty", MEMCACHED.getCache().getCurrentItems(), 0);
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ int port = HBaseTestingUtil.randomFreePort();
+ MEMCACHED = createDaemon(port);
+ Configuration conf = new Configuration();
+ conf.set("hbase.cache.memcached.servers", "localhost:" + port);
+ CACHE = new MemcachedBlockCache(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (MEMCACHED != null) {
+ MEMCACHED.stop();
+ }
+ }
+
+ @Test
+ public void testCache() throws Exception {
+ final int NUM_BLOCKS = 10;
+ HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
+ }
+ Waiter.waitFor(new Configuration(), 10000,
+ () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
+ }
+
+ @Test
+ public void testEviction() throws Exception {
+ final int NUM_BLOCKS = 10;
+ HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
+ }
+ Waiter.waitFor(new Configuration(), 10000,
+ () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ CACHE.evictBlock(blocks[i].getBlockName());
+ }
+ Waiter.waitFor(new Configuration(), 10000, () -> MEMCACHED.getCache().getCurrentItems() == 0);
+ }
+
+ private static MemCacheDaemon extends CacheElement> createDaemon(int port) {
+ InetSocketAddress addr = new InetSocketAddress("localhost", port);
+ MemCacheDaemon daemon = new MemCacheDaemon();
+ ConcurrentLinkedHashMap cacheStorage =
+ ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024);
+ daemon.setCache(new CacheImpl(cacheStorage));
+ daemon.setAddr(addr);
+ daemon.setVerbose(true);
+ daemon.start();
+ while (!daemon.isRunning()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return daemon;
+ }
+
+}