HBASE-27796 Improve MemcachedBlockCache (#5181)

Track and log better stats.
Fix use of memcached API such that we are not accidentally immediately expiring keys.
Use a default retention period of 30 days, the max per memcached spec.
Use set instead of add to store keys.
Gracefully handle failures to cache and read timeouts.
Add unit tests using jmemcached as a test dependency.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2023-04-17 13:20:55 -07:00 committed by GitHub
parent 31c4aea48c
commit d56e7f291f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 287 additions and 24 deletions

View File

@ -51,7 +51,63 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>jcl-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-logging</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thimbleware.jmemcached</groupId>
<artifactId>jmemcached-core</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -27,28 +27,36 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.spy.memcached.CachedData; import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode; import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClient;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
* that are tuned well and have a good network connection to the HBase regionservers. Any other use * that are tuned well and have a good network connection to the HBase regionservers. Any other use
* will likely slow down HBase greatly. * will likely slow down HBase greatly.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@SuppressWarnings("FutureReturnValueIgnored")
public class MemcachedBlockCache implements BlockCache { public class MemcachedBlockCache implements BlockCache {
private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
@ -56,6 +64,10 @@ public class MemcachedBlockCache implements BlockCache {
// that just in case this client is used with those versions. // that just in case this client is used with those versions.
public static final int MAX_SIZE = 1020 * 1024; public static final int MAX_SIZE = 1020 * 1024;
// Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size
public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
// spec
// Config key for what memcached servers to use. // Config key for what memcached servers to use.
// They should be specified in a comma sperated list with ports. // They should be specified in a comma sperated list with ports.
// like: // like:
@ -67,10 +79,20 @@ public class MemcachedBlockCache implements BlockCache {
public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze"; public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false; public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
public static final int STAT_THREAD_PERIOD = 60 * 5;
private final MemcachedClient client; private final MemcachedClient client;
private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
private final AtomicLong cachedCount = new AtomicLong();
private final AtomicLong notCachedCount = new AtomicLong();
private final AtomicLong cacheErrorCount = new AtomicLong();
private final AtomicLong timeoutCount = new AtomicLong();
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());
public MemcachedBlockCache(Configuration c) throws IOException { public MemcachedBlockCache(Configuration c) throws IOException {
LOG.info("Creating MemcachedBlockCache"); LOG.info("Creating MemcachedBlockCache");
@ -80,18 +102,15 @@ public class MemcachedBlockCache implements BlockCache {
boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
ConnectionFactoryBuilder builder = ConnectionFactoryBuilder builder =
// Cap the max time before anything times out
new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
// Don't keep threads around past the end of days.
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
.setUseNagleAlgorithm(false) // Ain't nobody got time for that .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);
.setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
// Assume only the localhost is serving memecached. // Assume only the localhost is serving memcached.
// A la mcrouter or co-locating memcached with split regionservers. // A la mcrouter or co-locating memcached with split regionservers.
// //
// If this config is a pool of memecached servers they will all be used according to the // If this config is a pool of memcached servers they will all be used according to the
// default hashing scheme defined by the memcache client. Spy Memecache client in this // default hashing scheme defined by the memcached client. Spy Memecache client in this
// case. // case.
String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211"); String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
String[] servers = serverListString.split(","); String[] servers = serverListString.split(",");
@ -104,6 +123,8 @@ public class MemcachedBlockCache implements BlockCache {
} }
client = new MemcachedClient(builder.build(), serverAddresses); client = new MemcachedClient(builder.build(), serverAddresses);
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
} }
@Override @Override
@ -111,22 +132,27 @@ public class MemcachedBlockCache implements BlockCache {
cacheBlock(cacheKey, buf); cacheBlock(cacheKey, buf);
} }
@SuppressWarnings("FutureReturnValueIgnored")
@Override @Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
if (buf instanceof HFileBlock) { if (buf instanceof HFileBlock) {
client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> { if (buf.getSerializedLength() > MAX_SIZE) {
LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
notCachedCount.incrementAndGet();
return;
}
client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
try { try {
f.get(); f.get();
} catch (ExecutionException e) { cachedCount.incrementAndGet();
LOG.warn("Failed to cache block", e); } catch (Exception e) {
LOG.warn("Failed to cache block with key " + cacheKey, e);
cacheErrorCount.incrementAndGet();
} }
}); });
} else { } else {
if (LOG.isDebugEnabled()) { LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
LOG.debug( notCachedCount.incrementAndGet();
"MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString());
}
} }
} }
@ -139,17 +165,25 @@ public class MemcachedBlockCache implements BlockCache {
try (Scope traceScope = span.makeCurrent()) { try (Scope traceScope = span.makeCurrent()) {
result = client.get(cacheKey.toString(), tc); result = client.get(cacheKey.toString(), tc);
} catch (Exception e) { } catch (Exception e) {
// Catch a pretty broad set of exceptions to limit any changes in the memecache client // Catch a pretty broad set of exceptions to limit any changes in the memcache client
// and how it handles failures from leaking into the read path. // and how it handles failures from leaking into the read path.
if (
(e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
&& (e.getCause() instanceof OperationTimeoutException))
) {
timeoutCount.incrementAndGet();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug("Timeout getting key " + cacheKey.toString(), e);
"Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.", }
e); } else {
cacheErrorCount.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Exception getting key " + cacheKey.toString(), e);
}
} }
result = null; result = null;
} finally { } finally {
span.end(); span.end();
// Update stats if this request doesn't have it turned off 100% of the time
if (updateCacheMetrics) { if (updateCacheMetrics) {
if (result == null) { if (result == null) {
cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@ -158,7 +192,6 @@ public class MemcachedBlockCache implements BlockCache {
} }
} }
} }
return result; return result;
} }
@ -194,6 +227,22 @@ public class MemcachedBlockCache implements BlockCache {
@Override @Override
public void shutdown() { public void shutdown() {
client.shutdown(); client.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}
if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
} }
@Override @Override
@ -289,4 +338,38 @@ public class MemcachedBlockCache implements BlockCache {
return MAX_SIZE; return MAX_SIZE;
} }
} }
private static class StatisticsThread extends Thread {
private final MemcachedBlockCache c;
public StatisticsThread(MemcachedBlockCache c) {
super("MemcachedBlockCacheStats");
setDaemon(true);
this.c = c;
}
@Override
public void run() {
c.logStats();
}
}
public void logStats() {
LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
+ ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
+ cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
+ (cacheStats.getHitCount() == 0
? "0"
: (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
+ "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
+ cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
+ (cacheStats.getHitCachingCount() == 0
? "0,"
: (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
+ "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
+ cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
}
} }

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import com.thimbleware.jmemcached.CacheElement;
import com.thimbleware.jmemcached.CacheImpl;
import com.thimbleware.jmemcached.Key;
import com.thimbleware.jmemcached.LocalCacheElement;
import com.thimbleware.jmemcached.MemCacheDaemon;
import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap;
import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ IOTests.class, SmallTests.class })
public class TestMemcachedBlockCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMemcachedBlockCache.class);
static MemCacheDaemon<? extends CacheElement> MEMCACHED;
static MemcachedBlockCache CACHE;
@Before
public void before() throws Exception {
MEMCACHED.getCache().flush_all();
assertEquals("Memcache is not empty", MEMCACHED.getCache().getCurrentItems(), 0);
}
@BeforeClass
public static void setup() throws Exception {
int port = HBaseTestingUtil.randomFreePort();
MEMCACHED = createDaemon(port);
Configuration conf = new Configuration();
conf.set("hbase.cache.memcached.servers", "localhost:" + port);
CACHE = new MemcachedBlockCache(conf);
}
@AfterClass
public static void tearDown() throws Exception {
if (MEMCACHED != null) {
MEMCACHED.stop();
}
}
@Test
public void testCache() throws Exception {
final int NUM_BLOCKS = 10;
HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
for (int i = 0; i < NUM_BLOCKS; i++) {
CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
}
Waiter.waitFor(new Configuration(), 10000,
() -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
}
@Test
public void testEviction() throws Exception {
final int NUM_BLOCKS = 10;
HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
for (int i = 0; i < NUM_BLOCKS; i++) {
CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
}
Waiter.waitFor(new Configuration(), 10000,
() -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
for (int i = 0; i < NUM_BLOCKS; i++) {
CACHE.evictBlock(blocks[i].getBlockName());
}
Waiter.waitFor(new Configuration(), 10000, () -> MEMCACHED.getCache().getCurrentItems() == 0);
}
private static MemCacheDaemon<? extends CacheElement> createDaemon(int port) {
InetSocketAddress addr = new InetSocketAddress("localhost", port);
MemCacheDaemon<LocalCacheElement> daemon = new MemCacheDaemon<LocalCacheElement>();
ConcurrentLinkedHashMap<Key, LocalCacheElement> cacheStorage =
ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024);
daemon.setCache(new CacheImpl(cacheStorage));
daemon.setAddr(addr);
daemon.setVerbose(true);
daemon.start();
while (!daemon.isRunning()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return daemon;
}
}