From c7e5fee4527c0d91b6fd12be58a67eb081984a44 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 31 Aug 2021 07:59:53 +0800 Subject: [PATCH] Fix an exception when using redis cluster as cache (#11369) * Redis mget problem in cluster mode * Format code * push down implementation of getBulk to sub-classes * Add tests * revert some changes * Fix intelllij inspections * Fix comments Signed-off-by: frank chen * Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java Co-authored-by: Benedict Jin * Update extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java Co-authored-by: Benedict Jin * Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java Co-authored-by: Benedict Jin * returns empty map in case of internal exception Co-authored-by: Benedict Jin --- .../client/cache/AbstractRedisCache.java | 31 ++++----- .../druid/client/cache/RedisClusterCache.java | 63 +++++++++++++++++-- .../client/cache/RedisStandaloneCache.java | 23 ++++++- .../client/cache/RedisClusterCacheTest.java | 23 +++++-- 4 files changed, 108 insertions(+), 32 deletions(-) diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java index 192e374ed8f..da11b76fbd7 100644 --- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java +++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java @@ -19,15 +19,14 @@ package org.apache.druid.client.cache; -import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import redis.clients.jedis.exceptions.JedisException; -import java.util.HashMap; -import java.util.List; +import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -93,21 +92,12 @@ public abstract class AbstractRedisCache implements Cache public Map getBulk(Iterable keys) { totalRequestCount.incrementAndGet(); - Map results = new HashMap<>(); - try { - List namedKeys = Lists.newArrayList(keys); - List byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray); + Pair> results = this.mgetFromRedis(keys); - List byteValues = this.mgetFromRedis(byteKeys.toArray(new byte[0][])); - for (int i = 0; i < byteValues.size(); ++i) { - if (byteValues.get(i) != null) { - results.put(namedKeys.get(i), byteValues.get(i)); - } - } - - hitCount.addAndGet(results.size()); - missCount.addAndGet(namedKeys.size() - results.size()); + hitCount.addAndGet(results.rhs.size()); + missCount.addAndGet(results.lhs - results.rhs.size()); + return results.rhs; } catch (JedisException e) { if (e.getMessage().contains("Read timed out")) { @@ -116,9 +106,8 @@ public abstract class AbstractRedisCache implements Cache errorCount.incrementAndGet(); } log.warn(e, "Exception pulling items from cache"); + return Collections.emptyMap(); } - - return results; } @Override @@ -172,7 +161,11 @@ public abstract class AbstractRedisCache implements Cache protected abstract void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration); - protected abstract List mgetFromRedis(byte[]... keys); + /** + * The lhs of the returned pair is the count of input keys + * The rhs of the returned pair is a map holding the values of their corresponding keys + */ + protected abstract Pair> mgetFromRedis(Iterable keys); protected abstract void cleanup(); } diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java index b71cce724e1..35827babc76 100644 --- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java +++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java @@ -19,14 +19,20 @@ package org.apache.druid.client.cache; +import org.apache.druid.java.util.common.Pair; import redis.clients.jedis.JedisCluster; +import redis.clients.util.JedisClusterCRC16; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class RedisClusterCache extends AbstractRedisCache { - private JedisCluster cluster; + private final JedisCluster cluster; RedisClusterCache(JedisCluster cluster, RedisCacheConfig config) { @@ -46,10 +52,59 @@ public class RedisClusterCache extends AbstractRedisCache cluster.setex(key, (int) expiration.getSeconds(), value); } - @Override - protected List mgetFromRedis(byte[]... keys) + static class CachableKey { - return cluster.mget(keys); + byte[] keyBytes; + NamedKey namedKey; + + public CachableKey(NamedKey namedKey) + { + this.keyBytes = namedKey.toByteArray(); + this.namedKey = namedKey; + } + } + + /** + * Jedis does not work if the given keys are distributed among different redis nodes + * A simple workaround is to group keys by their slots and mget values for each slot. + *

+ * In future, Jedis could be replaced by the Lettuce driver which supports mget operation on a redis cluster + */ + @Override + protected Pair> mgetFromRedis(Iterable keys) + { + int inputKeyCount = 0; + + // group keys based on their slots + Map> slot2Keys = new HashMap<>(); + for (NamedKey key : keys) { + inputKeyCount++; + + CachableKey cachableKey = new CachableKey(key); + int keySlot = JedisClusterCRC16.getSlot(cachableKey.keyBytes); + slot2Keys.computeIfAbsent(keySlot, val -> new ArrayList<>()).add(cachableKey); + } + + ConcurrentHashMap results = new ConcurrentHashMap<>(); + slot2Keys.keySet() + .parallelStream() + .forEach(slot -> { + List keyList = slot2Keys.get(slot); + + // mget for this slot + List values = cluster.mget(keyList.stream() + .map(key -> key.keyBytes) + .toArray(byte[][]::new)); + + for (int i = 0; i < keyList.size(); i++) { + byte[] value = values.get(i); + if (value != null) { + results.put(keyList.get(i).namedKey, value); + } + } + }); + + return new Pair<>(inputKeyCount, results); } @Override diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java index d2e5c59f17e..287d33ca76a 100644 --- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java +++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java @@ -19,14 +19,18 @@ package org.apache.druid.client.cache; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.Pair; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RedisStandaloneCache extends AbstractRedisCache { - private JedisPool pool; + private final JedisPool pool; RedisStandaloneCache(JedisPool pool, RedisCacheConfig config) { @@ -52,10 +56,23 @@ public class RedisStandaloneCache extends AbstractRedisCache } @Override - protected List mgetFromRedis(byte[]... keys) + protected Pair> mgetFromRedis(Iterable keys) { + List namedKeys = Lists.newArrayList(keys); + List byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray); + try (Jedis jedis = pool.getResource()) { - return jedis.mget(keys); + + List byteValues = jedis.mget(byteKeys.toArray(new byte[0][])); + + Map results = new HashMap<>(); + for (int i = 0; i < byteValues.size(); ++i) { + if (byteValues.get(i) != null) { + results.put(namedKeys.get(i), byteValues.get(i)); + } + } + + return new Pair<>(namedKeys.size(), results); } } diff --git a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java index bc439b47b41..c8a24b16f55 100644 --- a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java +++ b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java @@ -33,9 +33,10 @@ import redis.clients.jedis.JedisPoolConfig; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; public class RedisClusterCacheTest { @@ -58,6 +59,7 @@ public class RedisClusterCacheTest }; private RedisClusterCache cache; + private AtomicLong mgetCount = new AtomicLong(); @Before public void setUp() @@ -71,7 +73,7 @@ public class RedisClusterCacheTest // some methods must be overriden for test cases cache = new RedisClusterCache(new MockJedisCluster(Collections.singleton(new HostAndPort("localhost", 6379))) { - Map cacheStorage = new HashMap<>(); + final ConcurrentHashMap cacheStorage = new ConcurrentHashMap<>(); @Override public String setex(final byte[] key, final int seconds, final byte[] value) @@ -89,13 +91,12 @@ public class RedisClusterCacheTest @Override public List mget(final byte[]... keys) { + mgetCount.incrementAndGet(); List ret = new ArrayList<>(); for (byte[] key : keys) { String k = StringUtils.encodeBase64String(key); byte[] value = cacheStorage.get(k); - if (value != null) { - ret.add(value); - } + ret.add(value); } return ret; } @@ -122,6 +123,7 @@ public class RedisClusterCacheTest Cache.NamedKey key1 = new Cache.NamedKey("the", HI); Cache.NamedKey key2 = new Cache.NamedKey("the", HO); Cache.NamedKey key3 = new Cache.NamedKey("a", HI); + Cache.NamedKey notExist = new Cache.NamedKey("notExist", HI); //test put and get cache.put(key1, new byte[]{1, 2, 3, 4}); @@ -130,15 +132,24 @@ public class RedisClusterCacheTest Assert.assertEquals(0x01020304, Ints.fromByteArray(cache.get(key1))); Assert.assertEquals(0x02030405, Ints.fromByteArray(cache.get(key2))); Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3))); + Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3))); + Assert.assertNull(cache.get(notExist)); + + this.mgetCount.set(0); //test multi get Map result = cache.getBulk( Lists.newArrayList( key1, key2, - key3 + key3, + notExist ) ); + + // these 4 keys are distributed among different nodes, so there should be 4 times call of MGET + Assert.assertEquals(mgetCount.get(), 4); + Assert.assertEquals(result.size(), 3); Assert.assertEquals(0x01020304, Ints.fromByteArray(result.get(key1))); Assert.assertEquals(0x02030405, Ints.fromByteArray(result.get(key2))); Assert.assertEquals(0x03040506, Ints.fromByteArray(result.get(key3)));