Fix an exception when using redis cluster as cache (#11369)

* Redis mget problem in cluster mode

* Format code

* push down implementation of getBulk to sub-classes

* Add tests

* revert some changes

* Fix intelllij inspections

* Fix comments

Signed-off-by: frank chen <frank.chen021@outlook.com>

* Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java

Co-authored-by: Benedict Jin <asdf2014@apache.org>

* Update extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java

Co-authored-by: Benedict Jin <asdf2014@apache.org>

* Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java

Co-authored-by: Benedict Jin <asdf2014@apache.org>

* returns empty map in case of internal exception

Co-authored-by: Benedict Jin <asdf2014@apache.org>
This commit is contained in:
Frank Chen 2021-08-31 07:59:53 +08:00 committed by GitHub
parent 6d14ea2d14
commit c7e5fee452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 32 deletions

View File

@ -19,15 +19,14 @@
package org.apache.druid.client.cache;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import redis.clients.jedis.exceptions.JedisException;
import java.util.HashMap;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@ -93,21 +92,12 @@ public abstract class AbstractRedisCache implements Cache
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();
Map<NamedKey, byte[]> results = new HashMap<>();
try {
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
Pair<Integer, Map<NamedKey, byte[]>> results = this.mgetFromRedis(keys);
List<byte[]> byteValues = this.mgetFromRedis(byteKeys.toArray(new byte[0][]));
for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}
hitCount.addAndGet(results.size());
missCount.addAndGet(namedKeys.size() - results.size());
hitCount.addAndGet(results.rhs.size());
missCount.addAndGet(results.lhs - results.rhs.size());
return results.rhs;
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
@ -116,9 +106,8 @@ public abstract class AbstractRedisCache implements Cache
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
return Collections.emptyMap();
}
return results;
}
@Override
@ -172,7 +161,11 @@ public abstract class AbstractRedisCache implements Cache
protected abstract void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration);
protected abstract List<byte[]> mgetFromRedis(byte[]... keys);
/**
* The lhs of the returned pair is the count of input keys
* The rhs of the returned pair is a map holding the values of their corresponding keys
*/
protected abstract Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys);
protected abstract void cleanup();
}

View File

@ -19,14 +19,20 @@
package org.apache.druid.client.cache;
import org.apache.druid.java.util.common.Pair;
import redis.clients.jedis.JedisCluster;
import redis.clients.util.JedisClusterCRC16;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RedisClusterCache extends AbstractRedisCache
{
private JedisCluster cluster;
private final JedisCluster cluster;
RedisClusterCache(JedisCluster cluster, RedisCacheConfig config)
{
@ -46,10 +52,59 @@ public class RedisClusterCache extends AbstractRedisCache
cluster.setex(key, (int) expiration.getSeconds(), value);
}
@Override
protected List<byte[]> mgetFromRedis(byte[]... keys)
static class CachableKey
{
return cluster.mget(keys);
byte[] keyBytes;
NamedKey namedKey;
public CachableKey(NamedKey namedKey)
{
this.keyBytes = namedKey.toByteArray();
this.namedKey = namedKey;
}
}
/**
* Jedis does not work if the given keys are distributed among different redis nodes
* A simple workaround is to group keys by their slots and mget values for each slot.
* <p>
* In future, Jedis could be replaced by the Lettuce driver which supports mget operation on a redis cluster
*/
@Override
protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
{
int inputKeyCount = 0;
// group keys based on their slots
Map<Integer, List<CachableKey>> slot2Keys = new HashMap<>();
for (NamedKey key : keys) {
inputKeyCount++;
CachableKey cachableKey = new CachableKey(key);
int keySlot = JedisClusterCRC16.getSlot(cachableKey.keyBytes);
slot2Keys.computeIfAbsent(keySlot, val -> new ArrayList<>()).add(cachableKey);
}
ConcurrentHashMap<NamedKey, byte[]> results = new ConcurrentHashMap<>();
slot2Keys.keySet()
.parallelStream()
.forEach(slot -> {
List<CachableKey> keyList = slot2Keys.get(slot);
// mget for this slot
List<byte[]> values = cluster.mget(keyList.stream()
.map(key -> key.keyBytes)
.toArray(byte[][]::new));
for (int i = 0; i < keyList.size(); i++) {
byte[] value = values.get(i);
if (value != null) {
results.put(keyList.get(i).namedKey, value);
}
}
});
return new Pair<>(inputKeyCount, results);
}
@Override

View File

@ -19,14 +19,18 @@
package org.apache.druid.client.cache;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.Pair;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RedisStandaloneCache extends AbstractRedisCache
{
private JedisPool pool;
private final JedisPool pool;
RedisStandaloneCache(JedisPool pool, RedisCacheConfig config)
{
@ -52,10 +56,23 @@ public class RedisStandaloneCache extends AbstractRedisCache
}
@Override
protected List<byte[]> mgetFromRedis(byte[]... keys)
protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
{
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
try (Jedis jedis = pool.getResource()) {
return jedis.mget(keys);
List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
Map<NamedKey, byte[]> results = new HashMap<>();
for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}
return new Pair<>(namedKeys.size(), results);
}
}

View File

@ -33,9 +33,10 @@ import redis.clients.jedis.JedisPoolConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class RedisClusterCacheTest
{
@ -58,6 +59,7 @@ public class RedisClusterCacheTest
};
private RedisClusterCache cache;
private AtomicLong mgetCount = new AtomicLong();
@Before
public void setUp()
@ -71,7 +73,7 @@ public class RedisClusterCacheTest
// some methods must be overriden for test cases
cache = new RedisClusterCache(new MockJedisCluster(Collections.singleton(new HostAndPort("localhost", 6379)))
{
Map<String, byte[]> cacheStorage = new HashMap<>();
final ConcurrentHashMap<String, byte[]> cacheStorage = new ConcurrentHashMap<>();
@Override
public String setex(final byte[] key, final int seconds, final byte[] value)
@ -89,13 +91,12 @@ public class RedisClusterCacheTest
@Override
public List<byte[]> mget(final byte[]... keys)
{
mgetCount.incrementAndGet();
List<byte[]> ret = new ArrayList<>();
for (byte[] key : keys) {
String k = StringUtils.encodeBase64String(key);
byte[] value = cacheStorage.get(k);
if (value != null) {
ret.add(value);
}
ret.add(value);
}
return ret;
}
@ -122,6 +123,7 @@ public class RedisClusterCacheTest
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
Cache.NamedKey notExist = new Cache.NamedKey("notExist", HI);
//test put and get
cache.put(key1, new byte[]{1, 2, 3, 4});
@ -130,15 +132,24 @@ public class RedisClusterCacheTest
Assert.assertEquals(0x01020304, Ints.fromByteArray(cache.get(key1)));
Assert.assertEquals(0x02030405, Ints.fromByteArray(cache.get(key2)));
Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
Assert.assertNull(cache.get(notExist));
this.mgetCount.set(0);
//test multi get
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2,
key3
key3,
notExist
)
);
// these 4 keys are distributed among different nodes, so there should be 4 times call of MGET
Assert.assertEquals(mgetCount.get(), 4);
Assert.assertEquals(result.size(), 3);
Assert.assertEquals(0x01020304, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(0x02030405, Ints.fromByteArray(result.get(key2)));
Assert.assertEquals(0x03040506, Ints.fromByteArray(result.get(key3)));