mirror of https://github.com/apache/druid.git
commit
020a706ac4
|
@ -89,24 +89,6 @@ You can optionally only configure caching to be enabled on the broker by setting
|
||||||
|--------|---------------|-----------|-------|
|
|--------|---------------|-----------|-------|
|
||||||
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|
||||||
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|
||||||
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|
||||||
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|
|
||||||
|
|
||||||
#### Local Cache
|
See [cache configuration](caching.html) for how to configure cache settings.
|
||||||
|
|
||||||
|Property|Description|Default|
|
|
||||||
|--------|-----------|-------|
|
|
||||||
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|
|
||||||
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|
|
||||||
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
|
|
||||||
|
|
||||||
#### Memcache
|
|
||||||
|
|
||||||
|Property|Description|Default|
|
|
||||||
|--------|-----------|-------|
|
|
||||||
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|
|
||||||
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|
|
||||||
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
|
||||||
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
|
||||||
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
|
||||||
|`druid.cache.numConnections`|Number of memcached connections to use.|1|
|
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
---
|
||||||
|
layout: doc_page
|
||||||
|
---
|
||||||
|
|
||||||
|
# Caching
|
||||||
|
|
||||||
|
Caching can optionally be enabled on the broker and / or historical nodes.
|
||||||
|
See the [broker](broker.html#caching) and [historical](historical.html#caching)
|
||||||
|
configuration options for how to enable it for individual node types.
|
||||||
|
|
||||||
|
Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified.
|
||||||
|
Use the `druid.cache.type` configuration to set a different kind of cache.
|
||||||
|
|
||||||
|
|
||||||
|
## Cache configuration
|
||||||
|
|
||||||
|
Cache settings are set globally, so the same configuration can be re-used
|
||||||
|
for both broker and historical nodes, when defined in the common properties file.
|
||||||
|
|
||||||
|
|Property|Possible Values|Description|Default|
|
||||||
|
|--------|---------------|-----------|-------|
|
||||||
|
|`druid.cache.type`|`local`, `memcached`, `hybrid`|The type of cache to use for queries. See below of the configuration options for each cache type|`local`|
|
||||||
|
|
||||||
|
|
||||||
|
#### Local Cache
|
||||||
|
|
||||||
|
A simple in-memory LRU cache.
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|
||||||
|
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|
||||||
|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
|
||||||
|
|
||||||
|
|
||||||
|
#### Memcached
|
||||||
|
|
||||||
|
Uses memcached as cache backend. This allows all nodes to share the same cache.
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|
||||||
|
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|
||||||
|
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
||||||
|
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
||||||
|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
||||||
|
|`druid.cache.numConnections`|Number of memcached connections to use.|1|
|
||||||
|
|
||||||
|
|
||||||
|
#### Hybrid
|
||||||
|
|
||||||
|
Uses a combination of any two caches as a two-level L1 / L2 cache.
|
||||||
|
This may be used to combine a local in-memory cache with a remote memcached cache.
|
||||||
|
|
||||||
|
Cache requests will first check L1 cache before checking L2.
|
||||||
|
If there is an L1 miss and L2 hit, it will also populate L1.
|
||||||
|
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.cache.l1.type`|type of cache to use for L1 cache. See `druid.cache.type` configuration for valid types.|`local`|
|
||||||
|
|`druid.cache.l2.type`|type of cache to use for L2 cache. See `druid.cache.type` configuration for valid types.|`local`|
|
||||||
|
|`druid.cache.l1.*`|Any property valid for the given type of L1 cache can be set using this prefix. For instance, if you are using a `local` L1 cache, specify `druid.cache.l1.sizeInBytes` to set its size.|defaults are the same as for the given cache type.|
|
||||||
|
|`druid.cache.l2.*`|Prefix for L2 cache settings, see description for L1.|defaults are the same as for the given cache type.|
|
|
@ -81,23 +81,7 @@ You can optionally only configure caching to be enabled on the historical by set
|
||||||
|--------|---------------|-----------|-------|
|
|--------|---------------|-----------|-------|
|
||||||
|`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false|
|
|`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false|
|
||||||
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false|
|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false|
|
||||||
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|
|
||||||
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|
||||||
|
|
||||||
#### Local Cache
|
|
||||||
|
|
||||||
|Property|Description|Default|
|
See [cache configuration](caching.html) for how to configure cache settings.
|
||||||
|--------|-----------|-------|
|
|
||||||
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|
|
||||||
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|
|
||||||
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
|
|
||||||
|
|
||||||
#### Memcache
|
|
||||||
|
|
||||||
|Property|Description|Default|
|
|
||||||
|--------|-----------|-------|
|
|
||||||
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|
|
||||||
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|
|
||||||
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
|
||||||
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
|
||||||
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
|
||||||
|
|
|
@ -32,6 +32,13 @@ public interface Cache
|
||||||
{
|
{
|
||||||
public byte[] get(NamedKey key);
|
public byte[] get(NamedKey key);
|
||||||
public void put(NamedKey key, byte[] value);
|
public void put(NamedKey key, byte[] value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resulting map should not contain any null values (i.e. cache misses should not be included)
|
||||||
|
*
|
||||||
|
* @param keys
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
|
||||||
|
|
||||||
public void close(String namespace);
|
public void close(String namespace);
|
||||||
|
|
|
@ -24,7 +24,8 @@ import com.google.inject.Provider;
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalCacheProvider.class)
|
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalCacheProvider.class)
|
||||||
@JsonSubTypes(value = {
|
@JsonSubTypes(value = {
|
||||||
@JsonSubTypes.Type(name = "local", value = LocalCacheProvider.class),
|
@JsonSubTypes.Type(name = "local", value = LocalCacheProvider.class),
|
||||||
@JsonSubTypes.Type(name = "memcached", value = MemcachedCacheProvider.class)
|
@JsonSubTypes.Type(name = "memcached", value = MemcachedCacheProvider.class),
|
||||||
|
@JsonSubTypes.Type(name = "hybrid", value = HybridCacheProvider.class)
|
||||||
})
|
})
|
||||||
public interface CacheProvider extends Provider<Cache>
|
public interface CacheProvider extends Provider<Cache>
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class HybridCache implements Cache
|
||||||
|
{
|
||||||
|
private final Cache level1;
|
||||||
|
private final Cache level2;
|
||||||
|
|
||||||
|
private final AtomicLong hitCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong missCount = new AtomicLong(0);
|
||||||
|
|
||||||
|
public HybridCache(Cache level1, Cache level2)
|
||||||
|
{
|
||||||
|
this.level1 = level1;
|
||||||
|
this.level2 = level2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get(NamedKey key)
|
||||||
|
{
|
||||||
|
byte[] res = level1.get(key);
|
||||||
|
if (res == null) {
|
||||||
|
res = level2.get(key);
|
||||||
|
if(res != null) {
|
||||||
|
level1.put(key, res);
|
||||||
|
hitCount.incrementAndGet();
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
missCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
if (res != null) {
|
||||||
|
hitCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(NamedKey key, byte[] value)
|
||||||
|
{
|
||||||
|
level1.put(key, value);
|
||||||
|
level2.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||||
|
{
|
||||||
|
Set<NamedKey> remaining = Sets.newHashSet(keys);
|
||||||
|
Map<NamedKey, byte[]> res = level1.getBulk(keys);
|
||||||
|
hitCount.addAndGet(res.size());
|
||||||
|
|
||||||
|
remaining = Sets.difference(remaining, res.keySet());
|
||||||
|
|
||||||
|
if (!remaining.isEmpty()) {
|
||||||
|
Map<NamedKey, byte[]> res2 = level2.getBulk(remaining);
|
||||||
|
for(Map.Entry<NamedKey, byte[]> entry : res2.entrySet()) {
|
||||||
|
level1.put(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = res2.size();
|
||||||
|
hitCount.addAndGet(size);
|
||||||
|
missCount.addAndGet(remaining.size() - size);
|
||||||
|
|
||||||
|
if (size != 0) {
|
||||||
|
res = Maps.newHashMap(res);
|
||||||
|
res.putAll(res2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(String namespace)
|
||||||
|
{
|
||||||
|
level1.close(namespace);
|
||||||
|
level2.close(namespace);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheStats getStats()
|
||||||
|
{
|
||||||
|
CacheStats stats1 = level1.getStats();
|
||||||
|
CacheStats stats2 = level2.getStats();
|
||||||
|
return new CacheStats(
|
||||||
|
hitCount.get(),
|
||||||
|
missCount.get(),
|
||||||
|
stats1.getNumEntries() + stats2.getNumEntries(),
|
||||||
|
stats1.getSizeInBytes() + stats2.getSizeInBytes(),
|
||||||
|
stats1.getNumEvictions() + stats2.getNumEvictions(),
|
||||||
|
stats1.getNumTimeouts() + stats2.getNumTimeouts(),
|
||||||
|
stats1.getNumErrors() + stats2.getNumErrors()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isLocal()
|
||||||
|
{
|
||||||
|
return level1.isLocal() && level2.isLocal();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doMonitor(ServiceEmitter emitter)
|
||||||
|
{
|
||||||
|
level1.doMonitor(emitter);
|
||||||
|
level2.doMonitor(emitter);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
|
||||||
|
public class HybridCacheProvider implements CacheProvider
|
||||||
|
{
|
||||||
|
final CacheProvider level1;
|
||||||
|
final CacheProvider level2;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public HybridCacheProvider(
|
||||||
|
@JacksonInject @Named("l1") CacheProvider level1,
|
||||||
|
@JacksonInject @Named("l2") CacheProvider level2
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.level1 = Preconditions.checkNotNull(level1, "l1 cache not specified for hybrid cache");
|
||||||
|
this.level2 = Preconditions.checkNotNull(level2, "l2 cache not specified for hybrid cache");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cache get()
|
||||||
|
{
|
||||||
|
return new HybridCache(level1.get(), level2.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -103,7 +103,10 @@ public class MapCache implements Cache
|
||||||
{
|
{
|
||||||
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
|
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
|
||||||
for (NamedKey key : keys) {
|
for (NamedKey key : keys) {
|
||||||
retVal.put(key, get(key));
|
final byte[] value = get(key);
|
||||||
|
if (value != null) {
|
||||||
|
retVal.put(key, value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -567,11 +567,13 @@ public class MemcachedCache implements Cache
|
||||||
for (Map.Entry<String, Object> entry : some.entrySet()) {
|
for (Map.Entry<String, Object> entry : some.entrySet()) {
|
||||||
final NamedKey key = keyLookup.get(entry.getKey());
|
final NamedKey key = keyLookup.get(entry.getKey());
|
||||||
final byte[] value = (byte[]) entry.getValue();
|
final byte[] value = (byte[]) entry.getValue();
|
||||||
|
if (value != null) {
|
||||||
results.put(
|
results.put(
|
||||||
key,
|
key,
|
||||||
value == null ? null : deserializeValue(key, value)
|
deserializeValue(key, value)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.guice;
|
||||||
|
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.name.Names;
|
||||||
|
import io.druid.client.cache.Cache;
|
||||||
|
import io.druid.client.cache.CacheProvider;
|
||||||
|
import io.druid.guice.annotations.Global;
|
||||||
|
|
||||||
|
public class CacheModule implements Module
|
||||||
|
{
|
||||||
|
|
||||||
|
public static final String DRUID_GLOBAL_CACHE_PREFIX = "druid.cache";
|
||||||
|
|
||||||
|
public final String prefix;
|
||||||
|
|
||||||
|
public CacheModule()
|
||||||
|
{
|
||||||
|
this.prefix = DRUID_GLOBAL_CACHE_PREFIX;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CacheModule(String prefix)
|
||||||
|
{
|
||||||
|
this.prefix = prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class);
|
||||||
|
JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
|
||||||
|
|
||||||
|
binder.install(new HybridCacheModule(prefix));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HybridCacheModule implements Module {
|
||||||
|
|
||||||
|
private final String prefix;
|
||||||
|
|
||||||
|
public HybridCacheModule(String prefix)
|
||||||
|
{
|
||||||
|
this.prefix = prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bind(binder, prefix + ".l1", CacheProvider.class, Names.named("l1"));
|
||||||
|
JsonConfigProvider.bind(binder, prefix + ".l2", CacheProvider.class, Names.named("l2"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,175 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.name.Names;
|
||||||
|
import io.druid.guice.CacheModule;
|
||||||
|
import io.druid.guice.GuiceInjectors;
|
||||||
|
import io.druid.guice.annotations.Global;
|
||||||
|
import io.druid.initialization.Initialization;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class HybridCacheTest
|
||||||
|
{
|
||||||
|
private static final byte[] HI = "hi".getBytes();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInjection() throws Exception
|
||||||
|
{
|
||||||
|
final String prefix = "testInjectHybridCache";
|
||||||
|
System.setProperty(prefix + ".type", "hybrid");
|
||||||
|
System.setProperty(prefix + ".l1.type", "local");
|
||||||
|
System.setProperty(prefix + ".l2.type", "memcached");
|
||||||
|
System.setProperty(prefix + ".l2.hosts", "localhost:11711");
|
||||||
|
|
||||||
|
final Injector injector = Initialization.makeInjectorWithModules(
|
||||||
|
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("hybridTest");
|
||||||
|
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
|
||||||
|
|
||||||
|
binder.install(new CacheModule(prefix));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final CacheProvider cacheProvider = injector.getInstance(Key.get(CacheProvider.class, Global.class));
|
||||||
|
Assert.assertNotNull(cacheProvider);
|
||||||
|
Assert.assertEquals(HybridCacheProvider.class, cacheProvider.getClass());
|
||||||
|
|
||||||
|
final Cache cache = cacheProvider.get();
|
||||||
|
Assert.assertNotNull(cache);
|
||||||
|
|
||||||
|
Assert.assertFalse(cache.isLocal());
|
||||||
|
Assert.assertEquals(LocalCacheProvider.class, ((HybridCacheProvider) cacheProvider).level1.getClass());
|
||||||
|
Assert.assertEquals(MemcachedCacheProvider.class, ((HybridCacheProvider) cacheProvider).level2.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanity() throws Exception
|
||||||
|
{
|
||||||
|
final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
|
||||||
|
final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
|
||||||
|
HybridCache cache = new HybridCache(l1, l2);
|
||||||
|
|
||||||
|
final Cache.NamedKey key1 = new Cache.NamedKey("a", HI);
|
||||||
|
final Cache.NamedKey key2 = new Cache.NamedKey("b", HI);
|
||||||
|
final Cache.NamedKey key3 = new Cache.NamedKey("c", HI);
|
||||||
|
final Cache.NamedKey key4 = new Cache.NamedKey("d", HI);
|
||||||
|
|
||||||
|
final byte[] value1 = Ints.toByteArray(1);
|
||||||
|
final byte[] value2 = Ints.toByteArray(2);
|
||||||
|
final byte[] value3 = Ints.toByteArray(3);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// test put puts to both
|
||||||
|
cache.put(key1, value1);
|
||||||
|
Assert.assertEquals(value1, l1.get(key1));
|
||||||
|
Assert.assertEquals(value1, l2.get(key1));
|
||||||
|
Assert.assertEquals(value1, cache.get(key1));
|
||||||
|
|
||||||
|
int hits = 0;
|
||||||
|
Assert.assertEquals(0, cache.getStats().getNumMisses());
|
||||||
|
Assert.assertEquals(++hits, cache.getStats().getNumHits());
|
||||||
|
|
||||||
|
// test l1
|
||||||
|
l1.put(key2, value2);
|
||||||
|
Assert.assertEquals(value2, cache.get(key2));
|
||||||
|
Assert.assertEquals(0, cache.getStats().getNumMisses());
|
||||||
|
Assert.assertEquals(++hits, cache.getStats().getNumHits());
|
||||||
|
|
||||||
|
// test l2
|
||||||
|
l2.put(key3, value3);
|
||||||
|
Assert.assertEquals(value3, cache.get(key3));
|
||||||
|
Assert.assertEquals(0, cache.getStats().getNumMisses());
|
||||||
|
Assert.assertEquals(++hits, cache.getStats().getNumHits());
|
||||||
|
|
||||||
|
|
||||||
|
// test bulk get with l1 and l2
|
||||||
|
{
|
||||||
|
final HashSet<Cache.NamedKey> keys = Sets.newHashSet(key1, key2, key3);
|
||||||
|
Map<Cache.NamedKey, byte[]> res = cache.getBulk(keys);
|
||||||
|
Assert.assertNotNull(res);
|
||||||
|
Assert.assertEquals(keys, res.keySet());
|
||||||
|
Assert.assertArrayEquals(value1, res.get(key1));
|
||||||
|
Assert.assertArrayEquals(value2, res.get(key2));
|
||||||
|
Assert.assertArrayEquals(value3, res.get(key3));
|
||||||
|
|
||||||
|
hits += 3;
|
||||||
|
Assert.assertEquals(0, cache.getStats().getNumMisses());
|
||||||
|
Assert.assertEquals(hits, cache.getStats().getNumHits());
|
||||||
|
}
|
||||||
|
|
||||||
|
// test bulk get with l1 entries only
|
||||||
|
{
|
||||||
|
final HashSet<Cache.NamedKey> keys = Sets.newHashSet(key1, key2);
|
||||||
|
Map<Cache.NamedKey, byte[]> res = cache.getBulk(keys);
|
||||||
|
Assert.assertNotNull(res);
|
||||||
|
Assert.assertEquals(keys, res.keySet());
|
||||||
|
Assert.assertArrayEquals(value1, res.get(key1));
|
||||||
|
Assert.assertArrayEquals(value2, res.get(key2));
|
||||||
|
|
||||||
|
hits += 2;
|
||||||
|
Assert.assertEquals(0, cache.getStats().getNumMisses());
|
||||||
|
Assert.assertEquals(hits, cache.getStats().getNumHits());
|
||||||
|
}
|
||||||
|
|
||||||
|
int misses = 0;
|
||||||
|
Assert.assertNull(cache.get(key4));
|
||||||
|
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
|
||||||
|
|
||||||
|
Assert.assertTrue(cache.getBulk(Sets.newHashSet(key4)).isEmpty());
|
||||||
|
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
|
||||||
|
|
||||||
|
{
|
||||||
|
final Map<Cache.NamedKey, byte[]> res = cache.getBulk(Sets.newHashSet(key1, key4));
|
||||||
|
Assert.assertEquals(Sets.newHashSet(key1), res.keySet());
|
||||||
|
Assert.assertArrayEquals(value1, res.get(key1));
|
||||||
|
|
||||||
|
Assert.assertEquals(++hits, cache.getStats().getNumHits());
|
||||||
|
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
final Map<Cache.NamedKey, byte[]> res = cache.getBulk(Sets.newHashSet(key3, key4));
|
||||||
|
Assert.assertEquals(Sets.newHashSet(key3), res.keySet());
|
||||||
|
Assert.assertArrayEquals(value3, res.get(key3));
|
||||||
|
|
||||||
|
Assert.assertEquals(++hits, cache.getStats().getNumHits());
|
||||||
|
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -37,6 +37,7 @@ import com.metamx.metrics.AbstractMonitor;
|
||||||
import com.metamx.metrics.MonitorScheduler;
|
import com.metamx.metrics.MonitorScheduler;
|
||||||
import io.druid.collections.ResourceHolder;
|
import io.druid.collections.ResourceHolder;
|
||||||
import io.druid.collections.StupidResourceHolder;
|
import io.druid.collections.StupidResourceHolder;
|
||||||
|
import io.druid.guice.CacheModule;
|
||||||
import io.druid.guice.GuiceInjectors;
|
import io.druid.guice.GuiceInjectors;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.ManageLifecycle;
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
@ -175,8 +176,6 @@ public class MemcachedCacheTest
|
||||||
final String uuid = UUID.randomUUID().toString();
|
final String uuid = UUID.randomUUID().toString();
|
||||||
System.setProperty(uuid + ".type", "memcached");
|
System.setProperty(uuid + ".type", "memcached");
|
||||||
System.setProperty(uuid + ".hosts", "localhost");
|
System.setProperty(uuid + ".hosts", "localhost");
|
||||||
final MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class);
|
|
||||||
EasyMock.replay(monitorScheduler);
|
|
||||||
final Injector injector = Initialization.makeInjectorWithModules(
|
final Injector injector = Initialization.makeInjectorWithModules(
|
||||||
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
||||||
new Module()
|
new Module()
|
||||||
|
@ -184,7 +183,9 @@ public class MemcachedCacheTest
|
||||||
@Override
|
@Override
|
||||||
public void configure(Binder binder)
|
public void configure(Binder binder)
|
||||||
{
|
{
|
||||||
binder.bind(MonitorScheduler.class).toInstance(monitorScheduler);
|
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/memcached");
|
||||||
|
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
|
||||||
|
|
||||||
binder.bind(Cache.class).toProvider(CacheProvider.class);
|
binder.bind(Cache.class).toProvider(CacheProvider.class);
|
||||||
JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
|
JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.client.cache.CacheProvider;
|
||||||
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
|
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
|
||||||
import io.druid.client.selector.ServerSelectorStrategy;
|
import io.druid.client.selector.ServerSelectorStrategy;
|
||||||
import io.druid.client.selector.TierSelectorStrategy;
|
import io.druid.client.selector.TierSelectorStrategy;
|
||||||
|
import io.druid.guice.CacheModule;
|
||||||
import io.druid.guice.Jerseys;
|
import io.druid.guice.Jerseys;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.LazySingleton;
|
import io.druid.guice.LazySingleton;
|
||||||
|
@ -89,9 +90,9 @@ public class CliBroker extends ServerRunnable
|
||||||
binder.bind(BrokerServerView.class).in(LazySingleton.class);
|
binder.bind(BrokerServerView.class).in(LazySingleton.class);
|
||||||
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
|
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
|
||||||
|
|
||||||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
|
||||||
JsonConfigProvider.bind(binder, "druid.cache", CacheProvider.class);
|
|
||||||
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
|
JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
|
||||||
|
binder.install(new CacheModule());
|
||||||
|
|
||||||
JsonConfigProvider.bind(binder, "druid.broker.select", TierSelectorStrategy.class);
|
JsonConfigProvider.bind(binder, "druid.broker.select", TierSelectorStrategy.class);
|
||||||
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);
|
JsonConfigProvider.bind(binder, "druid.broker.select.tier.custom", CustomTierSelectorStrategyConfig.class);
|
||||||
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import io.druid.client.cache.Cache;
|
||||||
import io.druid.client.cache.CacheConfig;
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.client.cache.CacheMonitor;
|
import io.druid.client.cache.CacheMonitor;
|
||||||
import io.druid.client.cache.CacheProvider;
|
import io.druid.client.cache.CacheProvider;
|
||||||
|
import io.druid.guice.CacheModule;
|
||||||
import io.druid.guice.Jerseys;
|
import io.druid.guice.Jerseys;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.LazySingleton;
|
import io.druid.guice.LazySingleton;
|
||||||
|
@ -85,9 +86,8 @@ public class CliHistorical extends ServerRunnable
|
||||||
|
|
||||||
LifecycleModule.register(binder, ZkCoordinator.class);
|
LifecycleModule.register(binder, ZkCoordinator.class);
|
||||||
|
|
||||||
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
|
|
||||||
JsonConfigProvider.bind(binder, "druid.cache", CacheProvider.class);
|
|
||||||
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
|
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
|
||||||
|
binder.install(new CacheModule());
|
||||||
MetricsModule.register(binder, CacheMonitor.class);
|
MetricsModule.register(binder, CacheMonitor.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue