hybrid l1/l2 cache to combine local and remote cache

This commit is contained in:
Xavier Léauté 2015-10-01 11:25:03 -04:00
parent 5db4c5a089
commit b11c0859e8
11 changed files with 459 additions and 43 deletions

View File

@ -89,24 +89,6 @@ You can optionally only configure caching to be enabled on the broker by setting
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false| |`druid.broker.cache.useCache`|true, false|Enable the cache on the broker.|false|
|`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false| |`druid.broker.cache.populateCache`|true, false|Populate the cache on the broker.|false|
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
#### Local Cache See [cache configuration](caching.html) for how to configure cache settings.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
#### Memcache
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|`druid.cache.numConnections`|Number of memcached connections to use.|1|

View File

@ -0,0 +1,64 @@
---
layout: doc_page
---
# Caching
Caching can optionally be enabled on the broker and / or historical nodes.
See the [broker](broker.html#caching) and [historical](historical.html#caching)
configuration options for how to enable it for individual node types.
Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified.
Use the `druid.cache.type` configuration to set a different kind of cache.
## Cache configuration
Cache settings are set globally, so the same configuration can be re-used
for both broker and historical nodes, when defined in the common properties file.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.cache.type`|`local`, `memcached`, `hybrid`|The type of cache to use for queries. See below of the configuration options for each cache type|`local`|
#### Local Cache
A simple in-memory LRU cache.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
#### Memcached
Uses memcached as cache backend. This allows all nodes to share the same cache.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|`druid.cache.numConnections`|Number of memcached connections to use.|1|
#### Hybrid
Uses a combination of any two caches as a two-level L1 / L2 cache.
This may be used to combine a local in-memory cache with a remote memcached cache.
Cache requests will first check L1 cache before checking L2.
If there is an L1 miss and L2 hit, it will also populate L1.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.l1.type`|type of cache to use for L1 cache. See `druid.cache.type` configuration for valid types.|`local`|
|`druid.cache.l2.type`|type of cache to use for L2 cache. See `druid.cache.type` configuration for valid types.|`local`|
|`druid.cache.l1.*`|Any property valid for the given type of L1 cache can be set using this prefix. For instance, if you are using a `local` L1 cache, specify `druid.cache.l1.sizeInBytes` to set its size.|defaults are the same as for the given cache type.|
|`druid.cache.l2.*`|Prefix for L2 cache settings, see description for L1.|defaults are the same as for the given cache type.|

View File

@ -81,23 +81,7 @@ You can optionally only configure caching to be enabled on the historical by set
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false| |`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false| |`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false|
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| |`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
#### Local Cache
|Property|Description|Default| See [cache configuration](caching.html) for how to configure cache settings.
|--------|-----------|-------|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
#### Memcache
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|

View File

@ -32,6 +32,13 @@ public interface Cache
{ {
public byte[] get(NamedKey key); public byte[] get(NamedKey key);
public void put(NamedKey key, byte[] value); public void put(NamedKey key, byte[] value);
/**
* Resulting map should not contain any null values (i.e. cache misses should not be included)
*
* @param keys
* @return
*/
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys); public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
public void close(String namespace); public void close(String namespace);

View File

@ -24,7 +24,8 @@ import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalCacheProvider.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LocalCacheProvider.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "local", value = LocalCacheProvider.class), @JsonSubTypes.Type(name = "local", value = LocalCacheProvider.class),
@JsonSubTypes.Type(name = "memcached", value = MemcachedCacheProvider.class) @JsonSubTypes.Type(name = "memcached", value = MemcachedCacheProvider.class),
@JsonSubTypes.Type(name = "hybrid", value = HybridCacheProvider.class)
}) })
public interface CacheProvider extends Provider<Cache> public interface CacheProvider extends Provider<Cache>
{ {

View File

@ -0,0 +1,132 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.service.ServiceEmitter;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class HybridCache implements Cache
{
private final Cache level1;
private final Cache level2;
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
public HybridCache(Cache level1, Cache level2)
{
this.level1 = level1;
this.level2 = level2;
}
@Override
public byte[] get(NamedKey key)
{
byte[] res = level1.get(key);
if (res == null) {
res = level2.get(key);
if(res != null) {
level1.put(key, res);
hitCount.incrementAndGet();
return res;
}
missCount.incrementAndGet();
}
if (res != null) {
hitCount.incrementAndGet();
}
return res;
}
@Override
public void put(NamedKey key, byte[] value)
{
level1.put(key, value);
level2.put(key, value);
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
Set<NamedKey> remaining = Sets.newHashSet(keys);
Map<NamedKey, byte[]> res = level1.getBulk(keys);
hitCount.addAndGet(res.size());
remaining = Sets.difference(remaining, res.keySet());
if (!remaining.isEmpty()) {
Map<NamedKey, byte[]> res2 = level2.getBulk(remaining);
for(Map.Entry<NamedKey, byte[]> entry : res2.entrySet()) {
level1.put(entry.getKey(), entry.getValue());
}
int size = res2.size();
hitCount.addAndGet(size);
missCount.addAndGet(remaining.size() - size);
if (size != 0) {
res = Maps.newHashMap(res);
res.putAll(res2);
}
}
return res;
}
@Override
public void close(String namespace)
{
level1.close(namespace);
level2.close(namespace);
}
@Override
public CacheStats getStats()
{
CacheStats stats1 = level1.getStats();
CacheStats stats2 = level2.getStats();
return new CacheStats(
hitCount.get(),
missCount.get(),
stats1.getNumEntries() + stats2.getNumEntries(),
stats1.getSizeInBytes() + stats2.getSizeInBytes(),
stats1.getNumEvictions() + stats2.getNumEvictions(),
stats1.getNumTimeouts() + stats2.getNumTimeouts(),
stats1.getNumErrors() + stats2.getNumErrors()
);
}
@Override
public boolean isLocal()
{
return level1.isLocal() && level2.isLocal();
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
level1.doMonitor(emitter);
level2.doMonitor(emitter);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
public class HybridCacheProvider implements CacheProvider
{
final CacheProvider level1;
final CacheProvider level2;
@JsonCreator
public HybridCacheProvider(
@JacksonInject @Named("l1") CacheProvider level1,
@JacksonInject @Named("l2") CacheProvider level2
)
{
this.level1 = Preconditions.checkNotNull(level1, "l1 cache not specified for hybrid cache");
this.level2 = Preconditions.checkNotNull(level2, "l2 cache not specified for hybrid cache");
}
@Override
public Cache get()
{
return new HybridCache(level1.get(), level2.get());
}
}

View File

@ -103,7 +103,10 @@ public class MapCache implements Cache
{ {
Map<NamedKey, byte[]> retVal = Maps.newHashMap(); Map<NamedKey, byte[]> retVal = Maps.newHashMap();
for (NamedKey key : keys) { for (NamedKey key : keys) {
retVal.put(key, get(key)); final byte[] value = get(key);
if (value != null) {
retVal.put(key, value);
}
} }
return retVal; return retVal;
} }

View File

@ -567,10 +567,12 @@ public class MemcachedCache implements Cache
for (Map.Entry<String, Object> entry : some.entrySet()) { for (Map.Entry<String, Object> entry : some.entrySet()) {
final NamedKey key = keyLookup.get(entry.getKey()); final NamedKey key = keyLookup.get(entry.getKey());
final byte[] value = (byte[]) entry.getValue(); final byte[] value = (byte[]) entry.getValue();
results.put( if (value != null) {
key, results.put(
value == null ? null : deserializeValue(key, value) key,
); deserializeValue(key, value)
);
}
} }
return results; return results;

View File

@ -49,5 +49,24 @@ public class CacheModule implements Module
{ {
binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class); binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class); JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
binder.install(new HybridCacheModule(prefix));
}
public static class HybridCacheModule implements Module {
private final String prefix;
public HybridCacheModule(String prefix)
{
this.prefix = prefix;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, prefix + ".l1", CacheProvider.class, Names.named("l1"));
JsonConfigProvider.bind(binder, prefix + ".l2", CacheProvider.class, Names.named("l2"));
}
} }
} }

View File

@ -0,0 +1,175 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.guice.CacheModule;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.annotations.Global;
import io.druid.initialization.Initialization;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashSet;
import java.util.Map;
public class HybridCacheTest
{
private static final byte[] HI = "hi".getBytes();
@Test
public void testInjection() throws Exception
{
final String prefix = "testInjectHybridCache";
System.setProperty(prefix + ".type", "hybrid");
System.setProperty(prefix + ".l1.type", "local");
System.setProperty(prefix + ".l2.type", "memcached");
System.setProperty(prefix + ".l2.hosts", "localhost:11711");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("hybridTest");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.install(new CacheModule(prefix));
}
}
)
);
final CacheProvider cacheProvider = injector.getInstance(Key.get(CacheProvider.class, Global.class));
Assert.assertNotNull(cacheProvider);
Assert.assertEquals(HybridCacheProvider.class, cacheProvider.getClass());
final Cache cache = cacheProvider.get();
Assert.assertNotNull(cache);
Assert.assertFalse(cache.isLocal());
Assert.assertEquals(LocalCacheProvider.class, ((HybridCacheProvider) cacheProvider).level1.getClass());
Assert.assertEquals(MemcachedCacheProvider.class, ((HybridCacheProvider) cacheProvider).level2.getClass());
}
@Test
public void testSanity() throws Exception
{
final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
HybridCache cache = new HybridCache(l1, l2);
final Cache.NamedKey key1 = new Cache.NamedKey("a", HI);
final Cache.NamedKey key2 = new Cache.NamedKey("b", HI);
final Cache.NamedKey key3 = new Cache.NamedKey("c", HI);
final Cache.NamedKey key4 = new Cache.NamedKey("d", HI);
final byte[] value1 = Ints.toByteArray(1);
final byte[] value2 = Ints.toByteArray(2);
final byte[] value3 = Ints.toByteArray(3);
// test put puts to both
cache.put(key1, value1);
Assert.assertEquals(value1, l1.get(key1));
Assert.assertEquals(value1, l2.get(key1));
Assert.assertEquals(value1, cache.get(key1));
int hits = 0;
Assert.assertEquals(0, cache.getStats().getNumMisses());
Assert.assertEquals(++hits, cache.getStats().getNumHits());
// test l1
l1.put(key2, value2);
Assert.assertEquals(value2, cache.get(key2));
Assert.assertEquals(0, cache.getStats().getNumMisses());
Assert.assertEquals(++hits, cache.getStats().getNumHits());
// test l2
l2.put(key3, value3);
Assert.assertEquals(value3, cache.get(key3));
Assert.assertEquals(0, cache.getStats().getNumMisses());
Assert.assertEquals(++hits, cache.getStats().getNumHits());
// test bulk get with l1 and l2
{
final HashSet<Cache.NamedKey> keys = Sets.newHashSet(key1, key2, key3);
Map<Cache.NamedKey, byte[]> res = cache.getBulk(keys);
Assert.assertNotNull(res);
Assert.assertEquals(keys, res.keySet());
Assert.assertArrayEquals(value1, res.get(key1));
Assert.assertArrayEquals(value2, res.get(key2));
Assert.assertArrayEquals(value3, res.get(key3));
hits += 3;
Assert.assertEquals(0, cache.getStats().getNumMisses());
Assert.assertEquals(hits, cache.getStats().getNumHits());
}
// test bulk get with l1 entries only
{
final HashSet<Cache.NamedKey> keys = Sets.newHashSet(key1, key2);
Map<Cache.NamedKey, byte[]> res = cache.getBulk(keys);
Assert.assertNotNull(res);
Assert.assertEquals(keys, res.keySet());
Assert.assertArrayEquals(value1, res.get(key1));
Assert.assertArrayEquals(value2, res.get(key2));
hits += 2;
Assert.assertEquals(0, cache.getStats().getNumMisses());
Assert.assertEquals(hits, cache.getStats().getNumHits());
}
int misses = 0;
Assert.assertNull(cache.get(key4));
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
Assert.assertTrue(cache.getBulk(Sets.newHashSet(key4)).isEmpty());
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
{
final Map<Cache.NamedKey, byte[]> res = cache.getBulk(Sets.newHashSet(key1, key4));
Assert.assertEquals(Sets.newHashSet(key1), res.keySet());
Assert.assertArrayEquals(value1, res.get(key1));
Assert.assertEquals(++hits, cache.getStats().getNumHits());
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
}
{
final Map<Cache.NamedKey, byte[]> res = cache.getBulk(Sets.newHashSet(key3, key4));
Assert.assertEquals(Sets.newHashSet(key3), res.keySet());
Assert.assertArrayEquals(value3, res.get(key3));
Assert.assertEquals(++hits, cache.getStats().getNumHits());
Assert.assertEquals(++misses, cache.getStats().getNumMisses());
}
}
}