mirror of https://github.com/apache/druid.git
Redis cache extension enhancement (#10240)
* support redis cluster * add 'password', 'database' properties * test cases passed * update doc * some improvements * fix CI * add more test cases to improve branch coverage * fix dependency check for test * resolve review comments
This commit is contained in:
parent
a607e9e7ff
commit
028442e75e
|
@ -22,32 +22,85 @@ title: "Druid Redis Cache"
|
|||
~ under the License.
|
||||
-->
|
||||
|
||||
A cache implementation for Druid based on [Redis](https://github.com/redis/redis).
|
||||
|
||||
To use this Apache Druid extension, make sure to [include](../../development/extensions.md#loading-extensions) `druid-redis-cache` extension.
|
||||
Below are guidance and configuration options known to this module.
|
||||
|
||||
A cache implementation for Druid based on [Redis](https://github.com/antirez/redis).
|
||||
## Installation
|
||||
|
||||
Below are the configuration options known to this module.
|
||||
Use [pull-deps](../../operations/pull-deps.md) tool shipped with Druid to install this [extension](../../development/extensions.md#community-extensions) on broker, historical and middle manager nodes.
|
||||
|
||||
Note that just adding these properties does not enable the cache. You still need to add the `druid.<process-type>.cache.useCache` and `druid.<process-type>.cache.populateCache` properties for the processes you want to enable the cache on as described in the [cache configuration docs](../../configuration/index.html#cache-configuration).
|
||||
```bash
|
||||
java -classpath "druid_dir/lib/*" org.apache.druid.cli.Main tools pull-deps -c org.apache.druid.extensions.contrib:druid-redis-cache:{VERSION}
|
||||
```
|
||||
|
||||
A possible configuration would be to keep the properties below in your `common.runtime.properties` file (present on all processes) and then add `druid.<nodetype>.cache.useCache` and `druid.<nodetype>.cache.populateCache` in the `runtime.properties` file of the process types you want to enable caching on.
|
||||
## Enabling
|
||||
|
||||
To enable this extension after installation,
|
||||
|
||||
1. [include](../../development/extensions.md#loading-extensions) this `druid-redis-cache` extension
|
||||
2. to enable cache on broker nodes, follow [broker caching docs](../../configuration/index.html#broker-caching) to set related properties
|
||||
3. to enable cache on historical nodes, follow [historical caching docs](../../configuration/index.html#historical-caching) to set related properties
|
||||
4. to enable cache on middle manager nodes, follow [peon caching docs](../../configuration/index.html#peon-caching) to set related properties
|
||||
5. set `druid.cache.type` to `redis`
|
||||
6. add the following properties
|
||||
|
||||
## Configuration
|
||||
|
||||
|`common.runtime.properties`|Description|Default|Required|
|
||||
### Cluster mode
|
||||
|
||||
To utilize a redis cluster, following properties must be set.
|
||||
|
||||
Note: some redis cloud service providers provide redis cluster service via a redis proxy, for these clusters, please follow the [Standalone mode](#standalone-mode) configuration below.
|
||||
|
||||
| Properties |Description|Default|Required|
|
||||
|--------------------|-----------|-------|--------|
|
||||
|`druid.cache.cluster.nodes`| Redis nodes in a cluster, represented in comma separated string. See example below | None | yes |
|
||||
|`druid.cache.cluster.maxRedirection`| Max retry count | 5 | no |
|
||||
|
||||
#### Example
|
||||
|
||||
```properties
|
||||
# a typical redis cluster with 6 nodes
|
||||
druid.cache.cluster.nodes=127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005,127.0.0.1:7006
|
||||
```
|
||||
|
||||
### Standalone mode
|
||||
|
||||
To use a standalone redis, following properties must be set.
|
||||
|
||||
| Properties |Description|Default|Required|
|
||||
|--------------------|-----------|-------|--------|
|
||||
|`druid.cache.host`|Redis server host|None|yes|
|
||||
|`druid.cache.port`|Redis server port|None|yes|
|
||||
|`druid.cache.expiration`|Expiration(in milliseconds) for cache entries|24 * 3600 * 1000|no|
|
||||
|`druid.cache.timeout`|Timeout(in milliseconds) for get cache entries from Redis|2000|no|
|
||||
|`druid.cache.database`|Redis database index|0|no|
|
||||
|
||||
Note: if both `druid.cache.cluster.nodes` and `druid.cache.host` are provided, cluster mode is preferred.
|
||||
|
||||
### Shared Properties
|
||||
|
||||
Except for the properties above, there are some extra properties which can be customized to meet different needs.
|
||||
|
||||
| Properties |Description|Default|Required|
|
||||
|--------------------|-----------|-------|--------|
|
||||
|`druid.cache.password`| Password to access redis server/cluster | None |no|
|
||||
|`druid.cache.expiration`|Expiration for cache entries | P1D |no|
|
||||
|`druid.cache.timeout`|Timeout for connecting to Redis and reading entries from Redis|PT2S|no|
|
||||
|`druid.cache.maxTotalConnections`|Max total connections to Redis|8|no|
|
||||
|`druid.cache.maxIdleConnections`|Max idle connections to Redis|8|no|
|
||||
|`druid.cache.minIdleConnections`|Min idle connections to Redis|0|no|
|
||||
|
||||
## Enabling
|
||||
For `druid.cache.expiration` and `druid.cache.timeout` properties, values can be format of `Period` or a number in milliseconds.
|
||||
|
||||
To enable the redis cache, include this module on the loadList and set `druid.cache.type` to `redis` in your properties.
|
||||
```properties
|
||||
# Period format(recomended)
|
||||
# cache expires after 1 hour
|
||||
druid.cache.expiration=PT1H
|
||||
|
||||
# or in number(milliseconds) format
|
||||
# 1 hour = 3_600_000 milliseconds
|
||||
druid.cache.expiration=3600000
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
||||
|
|
|
@ -78,6 +78,32 @@
|
|||
<artifactId>jackson-databind</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.validation</groupId>
|
||||
<artifactId>validation-api</artifactId>
|
||||
<version>1.1.0.Final</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
@ -91,6 +117,12 @@
|
|||
<version>0.4.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
<version>1.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
|
|
|
@ -19,15 +19,11 @@
|
|||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -35,12 +31,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class RedisCache implements Cache
|
||||
public abstract class AbstractRedisCache implements Cache
|
||||
{
|
||||
private static final Logger log = new Logger(RedisCache.class);
|
||||
|
||||
private JedisPool pool;
|
||||
private RedisCacheConfig config;
|
||||
private static final Logger log = new Logger(AbstractRedisCache.class);
|
||||
|
||||
private final AtomicLong hitCount = new AtomicLong(0);
|
||||
private final AtomicLong missCount = new AtomicLong(0);
|
||||
|
@ -51,30 +44,19 @@ public class RedisCache implements Cache
|
|||
// both get、put and getBulk will increase request count by 1
|
||||
private final AtomicLong totalRequestCount = new AtomicLong(0);
|
||||
|
||||
private RedisCache(JedisPool pool, RedisCacheConfig config)
|
||||
{
|
||||
this.pool = pool;
|
||||
this.config = config;
|
||||
}
|
||||
private RedisCacheConfig.DurationConfig expiration;
|
||||
|
||||
public static RedisCache create(final RedisCacheConfig config)
|
||||
protected AbstractRedisCache(RedisCacheConfig config)
|
||||
{
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setMaxTotal(config.getMaxTotalConnections());
|
||||
poolConfig.setMaxIdle(config.getMaxIdleConnections());
|
||||
poolConfig.setMinIdle(config.getMinIdleConnections());
|
||||
|
||||
JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
|
||||
return new RedisCache(pool, config);
|
||||
this.expiration = config.getExpiration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(NamedKey key)
|
||||
{
|
||||
totalRequestCount.incrementAndGet();
|
||||
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
byte[] bytes = jedis.get(key.toByteArray());
|
||||
try {
|
||||
byte[] bytes = getFromRedis(key.toByteArray());
|
||||
if (bytes == null) {
|
||||
missCount.incrementAndGet();
|
||||
return null;
|
||||
|
@ -98,9 +80,8 @@ public class RedisCache implements Cache
|
|||
public void put(NamedKey key, byte[] value)
|
||||
{
|
||||
totalRequestCount.incrementAndGet();
|
||||
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
jedis.psetex(key.toByteArray(), config.getExpiration(), value);
|
||||
try {
|
||||
this.putToRedis(key.toByteArray(), value, this.expiration);
|
||||
}
|
||||
catch (JedisException e) {
|
||||
errorCount.incrementAndGet();
|
||||
|
@ -112,15 +93,13 @@ public class RedisCache implements Cache
|
|||
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||
{
|
||||
totalRequestCount.incrementAndGet();
|
||||
|
||||
Map<NamedKey, byte[]> results = new HashMap<>();
|
||||
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
try {
|
||||
List<NamedKey> namedKeys = Lists.newArrayList(keys);
|
||||
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
|
||||
|
||||
List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
|
||||
|
||||
List<byte[]> byteValues = this.mgetFromRedis(byteKeys.toArray(new byte[0][]));
|
||||
for (int i = 0; i < byteValues.size(); ++i) {
|
||||
if (byteValues.get(i) != null) {
|
||||
results.put(namedKeys.get(i), byteValues.get(i));
|
||||
|
@ -152,7 +131,7 @@ public class RedisCache implements Cache
|
|||
@LifecycleStop
|
||||
public void close()
|
||||
{
|
||||
pool.close();
|
||||
cleanup();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,9 +168,11 @@ public class RedisCache implements Cache
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
|
||||
{
|
||||
return new RedisCache(pool, config);
|
||||
}
|
||||
protected abstract byte[] getFromRedis(byte[] key);
|
||||
|
||||
protected abstract void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration);
|
||||
|
||||
protected abstract List<byte[]> mgetFromRedis(byte[]... keys);
|
||||
|
||||
protected abstract void cleanup();
|
||||
}
|
|
@ -20,35 +20,133 @@
|
|||
package org.apache.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.metadata.PasswordProvider;
|
||||
import org.joda.time.Period;
|
||||
import redis.clients.jedis.Protocol;
|
||||
|
||||
import javax.validation.constraints.Max;
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
public class RedisCacheConfig
|
||||
{
|
||||
public static class RedisClusterConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private String nodes;
|
||||
|
||||
// cluster
|
||||
@JsonProperty
|
||||
private int maxRedirection = 5;
|
||||
|
||||
public String getNodes()
|
||||
{
|
||||
return nodes;
|
||||
}
|
||||
|
||||
public int getMaxRedirection()
|
||||
{
|
||||
return maxRedirection;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Support for long-format and Period style format
|
||||
*/
|
||||
public static class DurationConfig
|
||||
{
|
||||
private long milliseconds;
|
||||
|
||||
public DurationConfig(String time)
|
||||
{
|
||||
try {
|
||||
// before 0.19.0, only long-format is support,
|
||||
// try to parse it as long
|
||||
this.milliseconds = Long.parseLong(time);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
// try to parse it as a Period string
|
||||
this.milliseconds = Period.parse(time).toStandardDuration().getMillis();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* kept for test cases only
|
||||
*/
|
||||
@VisibleForTesting
|
||||
DurationConfig(long milliseconds)
|
||||
{
|
||||
this.milliseconds = milliseconds;
|
||||
}
|
||||
|
||||
public long getMilliseconds()
|
||||
{
|
||||
return milliseconds;
|
||||
}
|
||||
|
||||
public int getMillisecondsAsInt()
|
||||
{
|
||||
if (milliseconds > Integer.MAX_VALUE) {
|
||||
throw new ISE("Milliseconds %d is out of range of int", milliseconds);
|
||||
}
|
||||
return (int) milliseconds;
|
||||
}
|
||||
|
||||
public long getSeconds()
|
||||
{
|
||||
return milliseconds / 1000;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* host of a standalone mode redis
|
||||
*/
|
||||
@JsonProperty
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* port of a standalone mode redis
|
||||
*/
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
@Max(65535)
|
||||
private int port;
|
||||
|
||||
// milliseconds, default to one day
|
||||
@JsonProperty
|
||||
private long expiration = 24 * 3600 * 1000;
|
||||
private DurationConfig expiration = new DurationConfig("P1D");
|
||||
|
||||
// milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
|
||||
@JsonProperty
|
||||
private int timeout = 2000;
|
||||
private DurationConfig timeout = new DurationConfig("PT2S");
|
||||
|
||||
// max connections of redis connection pool
|
||||
/**
|
||||
* max connections of redis connection pool
|
||||
*/
|
||||
@JsonProperty
|
||||
private int maxTotalConnections = 8;
|
||||
|
||||
// max idle connections of redis connection pool
|
||||
/**
|
||||
* max idle connections of redis connection pool
|
||||
*/
|
||||
@JsonProperty
|
||||
private int maxIdleConnections = 8;
|
||||
|
||||
// min idle connections of redis connection pool
|
||||
/**
|
||||
* min idle connections of redis connection pool
|
||||
*/
|
||||
@JsonProperty
|
||||
private int minIdleConnections = 0;
|
||||
|
||||
@JsonProperty
|
||||
private PasswordProvider password;
|
||||
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int database = Protocol.DEFAULT_DATABASE;
|
||||
|
||||
@JsonProperty
|
||||
private RedisClusterConfig cluster;
|
||||
|
||||
public String getHost()
|
||||
{
|
||||
return host;
|
||||
|
@ -59,12 +157,12 @@ public class RedisCacheConfig
|
|||
return port;
|
||||
}
|
||||
|
||||
public long getExpiration()
|
||||
public DurationConfig getExpiration()
|
||||
{
|
||||
return expiration;
|
||||
}
|
||||
|
||||
public int getTimeout()
|
||||
public DurationConfig getTimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
|
@ -83,4 +181,19 @@ public class RedisCacheConfig
|
|||
{
|
||||
return minIdleConnections;
|
||||
}
|
||||
|
||||
public RedisClusterConfig getCluster()
|
||||
{
|
||||
return cluster;
|
||||
}
|
||||
|
||||
public PasswordProvider getPassword()
|
||||
{
|
||||
return password;
|
||||
}
|
||||
|
||||
public int getDatabase()
|
||||
{
|
||||
return database;
|
||||
}
|
||||
}
|
||||
|
|
113
extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisCacheFactory.java
vendored
Normal file
113
extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisCacheFactory.java
vendored
Normal file
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RedisCacheFactory
|
||||
{
|
||||
public static Cache create(final RedisCacheConfig config)
|
||||
{
|
||||
if (config.getCluster() != null && StringUtils.isNotBlank(config.getCluster().getNodes())) {
|
||||
|
||||
Set<HostAndPort> nodes = Arrays.stream(config.getCluster().getNodes().split(","))
|
||||
.map(String::trim)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.map(hostAndPort -> {
|
||||
int index = hostAndPort.indexOf(':');
|
||||
if (index <= 0 || index == hostAndPort.length()) {
|
||||
throw new IAE("Invalid redis cluster configuration: %s", hostAndPort);
|
||||
}
|
||||
|
||||
int port;
|
||||
try {
|
||||
port = Integer.parseInt(hostAndPort.substring(index + 1));
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
throw new IAE("Invalid port in %s", hostAndPort);
|
||||
}
|
||||
if (port <= 0 || port > 65535) {
|
||||
throw new IAE("Invalid port in %s", hostAndPort);
|
||||
}
|
||||
|
||||
return new HostAndPort(hostAndPort.substring(0, index), port);
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setMaxTotal(config.getMaxTotalConnections());
|
||||
poolConfig.setMaxIdle(config.getMaxIdleConnections());
|
||||
poolConfig.setMinIdle(config.getMinIdleConnections());
|
||||
|
||||
JedisCluster cluster;
|
||||
if (config.getPassword() != null) {
|
||||
cluster = new JedisCluster(
|
||||
nodes,
|
||||
config.getTimeout().getMillisecondsAsInt(), //connection timeout
|
||||
config.getTimeout().getMillisecondsAsInt(), //read timeout
|
||||
config.getCluster().getMaxRedirection(),
|
||||
config.getPassword().getPassword(),
|
||||
poolConfig
|
||||
);
|
||||
} else {
|
||||
cluster = new JedisCluster(
|
||||
nodes,
|
||||
config.getTimeout().getMillisecondsAsInt(), //connection timeout and read timeout
|
||||
config.getCluster().getMaxRedirection(),
|
||||
poolConfig
|
||||
);
|
||||
}
|
||||
|
||||
return new RedisClusterCache(cluster, config);
|
||||
|
||||
} else {
|
||||
|
||||
if (StringUtils.isBlank(config.getHost())) {
|
||||
throw new IAE("Invalid redis configuration. no redis server or cluster configured.");
|
||||
}
|
||||
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setMaxTotal(config.getMaxTotalConnections());
|
||||
poolConfig.setMaxIdle(config.getMaxIdleConnections());
|
||||
poolConfig.setMinIdle(config.getMinIdleConnections());
|
||||
|
||||
return new RedisStandaloneCache(
|
||||
new JedisPool(
|
||||
poolConfig,
|
||||
config.getHost(),
|
||||
config.getPort(),
|
||||
config.getTimeout().getMillisecondsAsInt(), //connection timeout and read timeout
|
||||
config.getPassword() == null ? null : config.getPassword().getPassword(),
|
||||
config.getDatabase(),
|
||||
null
|
||||
),
|
||||
config
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,6 @@ public class RedisCacheProvider extends RedisCacheConfig implements CacheProvide
|
|||
@Override
|
||||
public Cache get()
|
||||
{
|
||||
return RedisCache.create(this);
|
||||
return RedisCacheFactory.create(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class RedisClusterCache extends AbstractRedisCache
|
||||
{
|
||||
private JedisCluster cluster;
|
||||
|
||||
RedisClusterCache(JedisCluster cluster, RedisCacheConfig config)
|
||||
{
|
||||
super(config);
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] getFromRedis(byte[] key)
|
||||
{
|
||||
return cluster.get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration)
|
||||
{
|
||||
cluster.setex(key, (int) expiration.getSeconds(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<byte[]> mgetFromRedis(byte[]... keys)
|
||||
{
|
||||
return cluster.mget(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup()
|
||||
{
|
||||
try {
|
||||
cluster.close();
|
||||
}
|
||||
catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class RedisStandaloneCache extends AbstractRedisCache
|
||||
{
|
||||
private JedisPool pool;
|
||||
|
||||
RedisStandaloneCache(JedisPool pool, RedisCacheConfig config)
|
||||
{
|
||||
super(config);
|
||||
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] getFromRedis(byte[] key)
|
||||
{
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
return jedis.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration)
|
||||
{
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
jedis.psetex(key, expiration.getMilliseconds(), value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<byte[]> mgetFromRedis(byte[]... keys)
|
||||
{
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
return jedis.mget(keys);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup()
|
||||
{
|
||||
pool.close();
|
||||
}
|
||||
}
|
227
extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisCacheConfigTest.java
vendored
Normal file
227
extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisCacheConfigTest.java
vendored
Normal file
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RedisCacheConfigTest
|
||||
{
|
||||
@FunctionalInterface
|
||||
interface MessageMatcher
|
||||
{
|
||||
boolean match(String input);
|
||||
}
|
||||
|
||||
static class StartWithMatcher implements MessageMatcher
|
||||
{
|
||||
private String expected;
|
||||
|
||||
public StartWithMatcher(String expected)
|
||||
{
|
||||
this.expected = expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean match(String input)
|
||||
{
|
||||
return input.startsWith(expected);
|
||||
}
|
||||
}
|
||||
|
||||
static class ContainsMatcher implements MessageMatcher
|
||||
{
|
||||
private String expected;
|
||||
|
||||
public ContainsMatcher(String expected)
|
||||
{
|
||||
this.expected = expected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean match(String input)
|
||||
{
|
||||
return input.contains(expected);
|
||||
}
|
||||
}
|
||||
|
||||
static class ExceptionMatcher implements Matcher
|
||||
{
|
||||
private MessageMatcher messageMatcher;
|
||||
private Class<? extends Throwable> exceptionClass;
|
||||
|
||||
public ExceptionMatcher(Class<? extends Throwable> exceptionClass, MessageMatcher exceptionMessageMatcher)
|
||||
{
|
||||
this.exceptionClass = exceptionClass;
|
||||
this.messageMatcher = exceptionMessageMatcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object item)
|
||||
{
|
||||
if (!(item.getClass().equals(exceptionClass))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return this.messageMatcher.match(((Throwable) item).getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeMismatch(Object item, Description mismatchDescription)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void _dont_implement_Matcher___instead_extend_BaseMatcher_()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testClusterPriority() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue("{\"expiration\": 1000,"
|
||||
+ "\"cluster\": {"
|
||||
+ "\"nodes\": \"127.0.0.1:6379\""
|
||||
+ "},"
|
||||
+ "\"host\": \"127.0.0.1\","
|
||||
+ "\"port\": 6379"
|
||||
+ "}", RedisCacheConfig.class);
|
||||
|
||||
try (Cache cache = RedisCacheFactory.create(fromJson)) {
|
||||
Assert.assertTrue(cache instanceof RedisClusterCache);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterInvalidNode() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue(
|
||||
"{\"expiration\": 1000,"
|
||||
+ "\"cluster\": {"
|
||||
+ "\"nodes\": \"127.0.0.1\"" //<===Invalid Node
|
||||
+ "}"
|
||||
+ "}",
|
||||
RedisCacheConfig.class
|
||||
);
|
||||
|
||||
expectedException.expect(new ExceptionMatcher(
|
||||
IAE.class,
|
||||
new StartWithMatcher("Invalid redis cluster")
|
||||
));
|
||||
RedisCacheFactory.create(fromJson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterLackOfPort() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue(
|
||||
"{\"expiration\":1000,"
|
||||
+ "\"cluster\": {"
|
||||
+ "\"nodes\": \"127.0.0.1:\""
|
||||
+ "}"
|
||||
+ "}",
|
||||
RedisCacheConfig.class
|
||||
);
|
||||
|
||||
expectedException.expect(new ExceptionMatcher(
|
||||
IAE.class,
|
||||
new StartWithMatcher("Invalid port")
|
||||
));
|
||||
RedisCacheFactory.create(fromJson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClusterNodePort0() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue(
|
||||
"{\"expiration\": 1000,"
|
||||
+ "\"cluster\": {"
|
||||
+ "\"nodes\": \"127.0.0.1:0\"" //<===Invalid Port
|
||||
+ "}"
|
||||
+ "}",
|
||||
RedisCacheConfig.class
|
||||
);
|
||||
|
||||
expectedException.expect(new ExceptionMatcher(
|
||||
IAE.class,
|
||||
new ContainsMatcher("Invalid port")
|
||||
));
|
||||
RedisCacheFactory.create(fromJson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClusterNodePort65536() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue(
|
||||
"{\"expiration\": 1000,"
|
||||
+ "\"cluster\": {"
|
||||
+ "\"nodes\": \"127.0.0.1:65536\"" //<===Invalid Port
|
||||
+ "}"
|
||||
+ "}",
|
||||
RedisCacheConfig.class
|
||||
);
|
||||
|
||||
expectedException.expect(new ExceptionMatcher(
|
||||
IAE.class,
|
||||
new ContainsMatcher("Invalid port")
|
||||
));
|
||||
RedisCacheFactory.create(fromJson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoClusterAndHost() throws IOException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue(
|
||||
"{\"expiration\": 1000"
|
||||
+ "}",
|
||||
RedisCacheConfig.class
|
||||
);
|
||||
|
||||
expectedException.expect(new ExceptionMatcher(
|
||||
IAE.class,
|
||||
new ContainsMatcher("no redis server")
|
||||
));
|
||||
RedisCacheFactory.create(fromJson);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fiftyonred.mock_jedis.MockJedisCluster;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RedisClusterCacheTest
|
||||
{
|
||||
private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii");
|
||||
private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo");
|
||||
|
||||
private final RedisCacheConfig cacheConfig = new RedisCacheConfig()
|
||||
{
|
||||
@Override
|
||||
public DurationConfig getTimeout()
|
||||
{
|
||||
return new DurationConfig("PT2S");
|
||||
}
|
||||
|
||||
@Override
|
||||
public DurationConfig getExpiration()
|
||||
{
|
||||
return new DurationConfig("PT1H");
|
||||
}
|
||||
};
|
||||
|
||||
private RedisClusterCache cache;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setMaxTotal(cacheConfig.getMaxTotalConnections());
|
||||
poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections());
|
||||
poolConfig.setMinIdle(cacheConfig.getMinIdleConnections());
|
||||
|
||||
// orginal MockJedisCluster does not provide full support for all public get/set interfaces
|
||||
// some methods must be overriden for test cases
|
||||
cache = new RedisClusterCache(new MockJedisCluster(Collections.singleton(new HostAndPort("localhost", 6379)))
|
||||
{
|
||||
Map<String, byte[]> cacheStorage = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public String setex(final byte[] key, final int seconds, final byte[] value)
|
||||
{
|
||||
cacheStorage.put(StringUtils.encodeBase64String(key), value);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key)
|
||||
{
|
||||
return cacheStorage.get(StringUtils.encodeBase64String(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<byte[]> mget(final byte[]... keys)
|
||||
{
|
||||
List<byte[]> ret = new ArrayList<>();
|
||||
for (byte[] key : keys) {
|
||||
String k = StringUtils.encodeBase64String(key);
|
||||
byte[] value = cacheStorage.get(k);
|
||||
if (value != null) {
|
||||
ret.add(value);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}, cacheConfig);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConfig() throws JsonProcessingException
|
||||
{
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
RedisCacheConfig fromJson = mapper.readValue("{\"expiration\": 1000}", RedisCacheConfig.class);
|
||||
Assert.assertEquals(1, fromJson.getExpiration().getSeconds());
|
||||
|
||||
fromJson = mapper.readValue("{\"expiration\": \"PT1H\"}", RedisCacheConfig.class);
|
||||
Assert.assertEquals(3600, fromJson.getExpiration().getSeconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCache()
|
||||
{
|
||||
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
|
||||
|
||||
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
|
||||
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
|
||||
Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
|
||||
|
||||
//test put and get
|
||||
cache.put(key1, new byte[]{1, 2, 3, 4});
|
||||
cache.put(key2, new byte[]{2, 3, 4, 5});
|
||||
cache.put(key3, new byte[]{3, 4, 5, 6});
|
||||
Assert.assertEquals(0x01020304, Ints.fromByteArray(cache.get(key1)));
|
||||
Assert.assertEquals(0x02030405, Ints.fromByteArray(cache.get(key2)));
|
||||
Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
|
||||
|
||||
//test multi get
|
||||
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
|
||||
Lists.newArrayList(
|
||||
key1,
|
||||
key2,
|
||||
key3
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(0x01020304, Ints.fromByteArray(result.get(key1)));
|
||||
Assert.assertEquals(0x02030405, Ints.fromByteArray(result.get(key2)));
|
||||
Assert.assertEquals(0x03040506, Ints.fromByteArray(result.get(key3)));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fiftyonred.mock_jedis.MockJedis;
|
||||
import com.fiftyonred.mock_jedis.MockJedisPool;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -41,24 +42,24 @@ import redis.clients.jedis.JedisPoolConfig;
|
|||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RedisCacheTest
|
||||
public class RedisStandaloneCacheTest
|
||||
{
|
||||
private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii");
|
||||
private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo");
|
||||
|
||||
private RedisCache cache;
|
||||
private RedisStandaloneCache cache;
|
||||
private final RedisCacheConfig cacheConfig = new RedisCacheConfig()
|
||||
{
|
||||
@Override
|
||||
public int getTimeout()
|
||||
public DurationConfig getTimeout()
|
||||
{
|
||||
return 10;
|
||||
return new DurationConfig("PT2S");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getExpiration()
|
||||
public DurationConfig getExpiration()
|
||||
{
|
||||
return 3600000;
|
||||
return new DurationConfig("PT1H");
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -82,13 +83,15 @@ public class RedisCacheTest
|
|||
}
|
||||
});
|
||||
|
||||
cache = RedisCache.create(pool, cacheConfig);
|
||||
cache = new RedisStandaloneCache(pool, cacheConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicInjection() throws Exception
|
||||
{
|
||||
final RedisCacheConfig config = new RedisCacheConfig();
|
||||
String json = "{ \"host\": \"localhost\", \"port\": 6379, \"expiration\": 3600}";
|
||||
final RedisCacheConfig config = new ObjectMapper().readValue(json, RedisCacheConfig.class);
|
||||
|
||||
Injector injector = Initialization.makeInjectorWithModules(
|
||||
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
|
||||
binder -> {
|
||||
|
@ -96,6 +99,9 @@ public class RedisCacheTest
|
|||
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
|
||||
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
|
||||
|
||||
binder.bindConstant().annotatedWith(Names.named("host")).to("localhost");
|
||||
binder.bindConstant().annotatedWith(Names.named("port")).to(6379);
|
||||
|
||||
binder.bind(RedisCacheConfig.class).toInstance(config);
|
||||
binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class);
|
||||
}
|
||||
|
@ -105,7 +111,7 @@ public class RedisCacheTest
|
|||
lifecycle.start();
|
||||
try {
|
||||
Cache cache = injector.getInstance(Cache.class);
|
||||
Assert.assertEquals(RedisCache.class, cache.getClass());
|
||||
Assert.assertEquals(RedisStandaloneCache.class, cache.getClass());
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
|
@ -206,7 +212,7 @@ class RedisCacheProviderWithConfig extends RedisCacheProvider
|
|||
@Override
|
||||
public Cache get()
|
||||
{
|
||||
return RedisCache.create(config);
|
||||
return RedisCacheFactory.create(config);
|
||||
}
|
||||
}
|
||||
|
|
@ -557,6 +557,8 @@ namespacePrefix
|
|||
src
|
||||
- ../docs/development/extensions-contrib/redis-cache.md
|
||||
loadList
|
||||
pull-deps
|
||||
PT2S
|
||||
- ../docs/development/extensions-contrib/sqlserver.md
|
||||
com.microsoft.sqlserver.jdbc.SQLServerDriver
|
||||
sqljdbc
|
||||
|
|
Loading…
Reference in New Issue