Caffeine cache extension (#3028)

* Initial commit of caffeine cache

* Address code comments

* Move and fixup README.md a bit

* Improve caffeine readme information

* Cleanup caffeine pom

* Address review comments

* Bump caffeine to 2.3.1

* Bump druid version to 0.9.2-SNAPSHOT

* Make test not fail randomly.

See https://github.com/ben-manes/caffeine/pull/93#issuecomment-227617998 for an explanation

* Fix distribution and documentation

* Add caffeine to extensions.md

* Fix links in extensions.md

* Lexicographic
This commit is contained in:
Charles Allen 2016-07-06 15:42:54 -07:00 committed by Gian Merlino
parent b8a4f4ea7b
commit 3f1681c16c
12 changed files with 1080 additions and 3 deletions

View File

@ -152,4 +152,53 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>java8</id>
<activation>
<jdk>1.8</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>pull-deps-jdk8</id>
<phase>package</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath/>
<argument>-Ddruid.extensions.loadList=[]</argument>
<argument>-Ddruid.extensions.directory=${project.build.directory}/extensions
</argument>
<argument>
-Ddruid.extensions.hadoopDependenciesDir=${project.build.directory}/hadoop-dependencies
</argument>
<argument>io.druid.cli.Main</argument>
<argument>tools</argument>
<argument>pull-deps</argument>
<argument>--defaultVersion</argument>
<argument>${project.parent.version}</argument>
<argument>-l</argument>
<argument>${settings.localRepository}</argument>
<!-- Only need stuff here that is NOT included in the general release -->
<argument>--no-default-hadoop</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-caffeine-cache</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,39 @@
---
layout: doc_page
---
Druid Caffeine Cache
--------------------
A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher
# Configuration
Below are the configuration options known to this module:
|`runtime.properties`|Description|Default|
|--------------------|-----------|-------|
|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)|
|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)|
|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)|
|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`|
## `druid.cache.cacheExecutorFactory`
Here are the possible values for `druid.cache.cacheExecutorFactory`, which controls how maintenance tasks are run
* `COMMON_FJP` (default) use the common ForkJoinPool. Do NOT use this option unless you are running 8u60 or higher
* `SINGLE_THREAD` Use a single-threaded executor
* `SAME_THREAD` Cache maintenance is done eagerly
# Enabling
To enable the caffeine cache, include this module on the loadList and set `druid.cache.type` to `caffeine` in your properties.
# Metrics
In addition to the normal cache metrics, the caffeine cache implementation also reports the following in both `total` and `delta`
|Metric|Description|Normal value|
|------|-----------|------------|
|`query/cache/caffeine/*/requests`|Count of hits or misses|hit + miss|
|`query/cache/caffeine/*/loadTime`|Length of time caffeine spends loading new values (unused feature)|0|
|`query/cache/caffeine/*/evictionBytes`|Size in bytes that have been evicted from the cache|Varies, should tune cache `sizeInBytes` so that `sizeInBytes`/`evictionBytes` is approximately the rate of cache churn you desire|

View File

@ -22,6 +22,7 @@ Core extensions are maintained by Druid committers.
|Name|Description|Docs|
|----|-----------|----|
|druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)|
|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)|
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-aggregators.html)|
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
|druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|

View File

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-caffeine-cache</artifactId>
<name>druid-caffeine-cache</name>
<description>Local cache implementation for Druid using Caffeine https://github.com/ben-manes/caffeine as the underlying implementation</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<scope>provided</scope>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.druid.concurrent.Execs;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
public enum CacheExecutorFactory
{
COMMON_FJP {
@Override
public Executor createExecutor()
{
return ForkJoinPool.commonPool();
}
},
SINGLE_THREAD {
@Override
public Executor createExecutor()
{
return Execs.singleThreaded("CaffeineWorker-%s");
}
},
SAME_THREAD {
@Override
public Executor createExecutor()
{
return Runnable::run;
}
};
public abstract Executor createExecutor();
@JsonCreator
public static CacheExecutorFactory from(String str)
{
return Enum.valueOf(CacheExecutorFactory.class, str.toUpperCase());
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.primitives.Chars;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CaffeineCache implements io.druid.client.cache.Cache
{
private static final Logger log = new Logger(CaffeineCache.class);
private static final int FIXED_COST = 8; // Minimum cost in "weight" per entry;
private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor();
private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor();
private final Cache<NamedKey, byte[]> cache;
private final AtomicReference<CacheStats> priorStats = new AtomicReference<>(CacheStats.empty());
private final CaffeineCacheConfig config;
public static CaffeineCache create(final CaffeineCacheConfig config)
{
return create(config, config.createExecutor());
}
// Used in testing
public static CaffeineCache create(final CaffeineCacheConfig config, final Executor executor)
{
Caffeine<Object, Object> builder = Caffeine.newBuilder().recordStats();
if (config.getExpireAfter() >= 0) {
builder
.expireAfterAccess(config.getExpireAfter(), TimeUnit.MILLISECONDS);
}
if (config.getSizeInBytes() >= 0) {
builder
.maximumWeight(config.getSizeInBytes())
.weigher((NamedKey key, byte[] value) -> value.length
+ key.key.length
+ key.namespace.length() * Chars.BYTES
+ FIXED_COST);
}
builder.executor(executor);
return new CaffeineCache(builder.build(), config);
}
private CaffeineCache(final Cache<NamedKey, byte[]> cache, CaffeineCacheConfig config)
{
this.cache = cache;
this.config = config;
}
@Override
public byte[] get(NamedKey key)
{
return deserialize(cache.getIfPresent(key));
}
@Override
public void put(NamedKey key, byte[] value)
{
cache.put(key, serialize(value));
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
// The assumption here is that every value is accessed at least once. Materializing here ensures deserialize is only
// called *once* per value.
return ImmutableMap.copyOf(Maps.transformValues(cache.getAllPresent(keys), this::deserialize));
}
// This is completely racy with put. Any values missed should be evicted later anyways. So no worries.
@Override
public void close(String namespace)
{
if (config.isEvictOnClose()) {
cache.asMap().keySet().removeIf(key -> key.namespace.equals(namespace));
}
}
@Override
public io.druid.client.cache.CacheStats getStats()
{
final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats();
final long size = cache
.policy().eviction()
.map(eviction -> eviction.isWeighted() ? eviction.weightedSize() : OptionalLong.empty())
.orElse(OptionalLong.empty()).orElse(-1);
return new io.druid.client.cache.CacheStats(
stats.hitCount(),
stats.missCount(),
cache.estimatedSize(),
size,
stats.evictionCount(),
0,
stats.loadFailureCount()
);
}
@Override
public boolean isLocal()
{
return true;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
final CacheStats oldStats = priorStats.get();
final CacheStats newStats = cache.stats();
final CacheStats deltaStats = newStats.minus(oldStats);
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount()));
emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount()));
emitter.emit(builder.build("query/cache/caffeine/delta/loadTime", deltaStats.totalLoadTime()));
emitter.emit(builder.build("query/cache/caffeine/total/loadTime", newStats.totalLoadTime()));
emitter.emit(builder.build("query/cache/caffeine/delta/evictionBytes", deltaStats.evictionWeight()));
emitter.emit(builder.build("query/cache/caffeine/total/evictionBytes", newStats.evictionWeight()));
if (!priorStats.compareAndSet(oldStats, newStats)) {
// ISE for stack trace
log.warn(
new IllegalStateException("Multiple monitors"),
"Multiple monitors on the same cache causing race conditions and unreliable stats reporting"
);
}
}
@VisibleForTesting
Cache<NamedKey, byte[]> getCache()
{
return cache;
}
private byte[] deserialize(byte[] bytes)
{
if (bytes == null) {
return null;
}
final int decompressedLen = ByteBuffer.wrap(bytes).getInt();
final byte[] out = new byte[decompressedLen];
LZ4_DECOMPRESSOR.decompress(bytes, Ints.BYTES, out, 0, out.length);
return out;
}
private byte[] serialize(byte[] value)
{
final int len = LZ4_COMPRESSOR.maxCompressedLength(value.length);
final byte[] out = new byte[len];
final int compressedSize = LZ4_COMPRESSOR.compress(value, 0, value.length, out, 0);
return ByteBuffer.allocate(compressedSize + Ints.BYTES)
.putInt(value.length)
.put(out, 0, compressedSize)
.array();
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.concurrent.Executor;
public class CaffeineCacheConfig
{
@JsonProperty
private long expireAfter = -1;
@JsonProperty
private long sizeInBytes = -1;
@JsonProperty
// Do not use COMMON_FJP unless you're running 8u60 or higher
// see https://github.com/ben-manes/caffeine/issues/77
private CacheExecutorFactory cacheExecutorFactory = CacheExecutorFactory.COMMON_FJP;
@JsonProperty
private boolean evictOnClose = false;
public long getExpireAfter()
{
return expireAfter;
}
public long getSizeInBytes()
{
return sizeInBytes;
}
public Executor createExecutor()
{
return cacheExecutorFactory.createExecutor();
}
public boolean isEvictOnClose()
{
return evictOnClose;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("caffeine")
public class CaffeineCacheProvider extends CaffeineCacheConfig implements CacheProvider
{
@Override
public Cache get()
{
return CaffeineCache.create(this);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
public class CaffeineDruidModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("DruidCaffeineCache")
.registerSubtypes(CaffeineCacheProvider.class)
);
}
}

View File

@ -0,0 +1 @@
io.druid.client.cache.CaffeineDruidModule

View File

@ -0,0 +1,500 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.JsonConfigurator;
import io.druid.guice.ManageLifecycle;
import io.druid.initialization.Initialization;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
public class CaffeineCacheTest
{
private static final int RANDOM_SEED = 3478178;
private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
private static final byte[] HO = "hooooooooooooooooooo".getBytes();
private CaffeineCache cache;
private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig()
{
@Override
public boolean isEvictOnClose()
{
return true;
}
};
@Before
public void setUp() throws Exception
{
cache = CaffeineCache.create(cacheConfig);
}
@Test
public void testBasicInjection() throws Exception
{
final CaffeineCacheConfig config = new CaffeineCacheConfig();
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bind(CaffeineCacheConfig.class).toInstance(config);
binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class);
}
)
);
final Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
try {
Cache cache = injector.getInstance(Cache.class);
Assert.assertEquals(CaffeineCache.class, cache.getClass());
}
finally {
lifecycle.stop();
}
}
@Test
public void testSimpleInjection()
{
final String uuid = UUID.randomUUID().toString();
System.setProperty(uuid + ".type", "caffeine");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bind(Cache.class).toProvider(CacheProvider.class);
JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
}
)
);
final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class);
Assert.assertNotNull(cacheProvider);
Assert.assertEquals(CaffeineCacheProvider.class, cacheProvider.getClass());
}
@Test
public void testBaseOps() throws Exception
{
final Cache.NamedKey aKey = new Cache.NamedKey("a", HI);
Assert.assertNull(cache.get(aKey));
put(cache, aKey, 1);
Assert.assertEquals(1, get(cache, aKey));
cache.close("a");
Assert.assertNull(cache.get(aKey));
final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI);
final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO);
put(cache, hiKey, 10);
put(cache, hoKey, 20);
Assert.assertEquals(10, get(cache, hiKey));
Assert.assertEquals(20, get(cache, hoKey));
cache.close("the");
Assert.assertNull(cache.get(hiKey));
Assert.assertNull(cache.get(hoKey));
Assert.assertNull(cache.get(new Cache.NamedKey("miss", HI)));
final CacheStats stats = cache.getStats();
Assert.assertEquals(3, stats.getNumHits());
Assert.assertEquals(5, stats.getNumMisses());
}
@Test
public void testGetBulk() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
put(cache, key1, 2);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
put(cache, key2, 10);
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2
)
);
Assert.assertEquals(2, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI);
result = cache.getBulk(Lists.newArrayList(missingKey));
Assert.assertEquals(result.size(), 0);
result = cache.getBulk(Lists.<Cache.NamedKey>newArrayList());
Assert.assertEquals(result.size(), 0);
}
@Test
public void testSizeEviction() throws Exception
{
final CaffeineCacheConfig config = new CaffeineCacheConfig()
{
@Override
public long getSizeInBytes()
{
return 40;
}
};
final Random random = new Random(843671346794319L);
final byte[] val1 = new byte[14], val2 = new byte[14];
final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02};
random.nextBytes(val1);
random.nextBytes(val2);
final Cache.NamedKey key1 = new Cache.NamedKey("the", s1);
final Cache.NamedKey key2 = new Cache.NamedKey("the", s2);
final CaffeineCache cache = CaffeineCache.create(config, Runnable::run);
forceRandomSeed(cache);
Assert.assertNull(cache.get(key1));
Assert.assertNull(cache.get(key2));
cache.put(key1, val1);
Assert.assertArrayEquals(val1, cache.get(key1));
Assert.assertNull(cache.get(key2));
Assert.assertEquals(0, cache.getCache().stats().evictionWeight());
Assert.assertArrayEquals(val1, cache.get(key1));
Assert.assertNull(cache.get(key2));
cache.put(key2, val2);
Assert.assertNull(cache.get(key1));
Assert.assertArrayEquals(val2, cache.get(key2));
Assert.assertEquals(34, cache.getCache().stats().evictionWeight());
}
@Test
public void testSizeCalculation()
{
final CaffeineCacheConfig config = new CaffeineCacheConfig()
{
@Override
public long getSizeInBytes()
{
return 40;
}
};
final Random random = new Random(843671346794319L);
final byte[] val1 = new byte[14], val2 = new byte[14];
final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02};
random.nextBytes(val1);
random.nextBytes(val2);
final Cache.NamedKey key1 = new Cache.NamedKey("the", s1);
final Cache.NamedKey key2 = new Cache.NamedKey("the", s2);
final Cache cache = CaffeineCache.create(config, Runnable::run);
CacheStats stats = cache.getStats();
Assert.assertEquals(0L, stats.getNumEntries());
Assert.assertEquals(0L, stats.getSizeInBytes());
cache.put(key1, val1);
stats = cache.getStats();
Assert.assertEquals(1L, stats.getNumEntries());
Assert.assertEquals(34L, stats.getSizeInBytes());
cache.put(key2, val2);
stats = cache.getStats();
Assert.assertEquals(1L, stats.getNumEntries());
Assert.assertEquals(34L, stats.getSizeInBytes());
}
@Test
public void testSizeCalculationAfterDelete()
{
final String namespace = "the";
final CaffeineCacheConfig config = new CaffeineCacheConfig()
{
@Override
public long getSizeInBytes()
{
return 999999;
}
@Override
public boolean isEvictOnClose()
{
return true;
}
};
final Random random = new Random(843671346794319L);
final byte[] val1 = new byte[14], val2 = new byte[14];
final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02};
random.nextBytes(val1);
random.nextBytes(val2);
final Cache.NamedKey key1 = new Cache.NamedKey(namespace, s1);
final Cache.NamedKey key2 = new Cache.NamedKey(namespace, s2);
final Cache cache = CaffeineCache.create(config, Runnable::run);
CacheStats stats = cache.getStats();
Assert.assertEquals(0L, stats.getNumEntries());
Assert.assertEquals(0L, stats.getSizeInBytes());
cache.put(key1, val1);
stats = cache.getStats();
Assert.assertEquals(1L, stats.getNumEntries());
Assert.assertEquals(34L, stats.getSizeInBytes());
cache.put(key2, val2);
stats = cache.getStats();
Assert.assertEquals(2L, stats.getNumEntries());
Assert.assertEquals(68L, stats.getSizeInBytes());
cache.close(namespace);
stats = cache.getStats();
Assert.assertEquals(0, stats.getNumEntries());
Assert.assertEquals(0, stats.getSizeInBytes());
}
@Test
public void testSizeCalculationMore()
{
final CaffeineCacheConfig config = new CaffeineCacheConfig()
{
@Override
public long getSizeInBytes()
{
return 400;
}
};
final Random random = new Random(843671346794319L);
final byte[] val1 = new byte[14], val2 = new byte[14];
final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02};
random.nextBytes(val1);
random.nextBytes(val2);
final Cache.NamedKey key1 = new Cache.NamedKey("the", s1);
final Cache.NamedKey key2 = new Cache.NamedKey("the", s2);
final Cache cache = CaffeineCache.create(config, Runnable::run);
CacheStats stats = cache.getStats();
Assert.assertEquals(0L, stats.getNumEntries());
Assert.assertEquals(0L, stats.getSizeInBytes());
cache.put(key1, val1);
stats = cache.getStats();
Assert.assertEquals(1L, stats.getNumEntries());
Assert.assertEquals(34L, stats.getSizeInBytes());
cache.put(key2, val2);
stats = cache.getStats();
Assert.assertEquals(2L, stats.getNumEntries());
Assert.assertEquals(68L, stats.getSizeInBytes());
}
@Test
public void testSizeCalculationNoWeight()
{
final CaffeineCacheConfig config = new CaffeineCacheConfig()
{
@Override
public long getSizeInBytes()
{
return -1;
}
};
final Random random = new Random(843671346794319L);
final byte[] val1 = new byte[14], val2 = new byte[14];
final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02};
random.nextBytes(val1);
random.nextBytes(val2);
final Cache.NamedKey key1 = new Cache.NamedKey("the", s1);
final Cache.NamedKey key2 = new Cache.NamedKey("the", s2);
final CaffeineCache cache = CaffeineCache.create(config, Runnable::run);
CacheStats stats = cache.getStats();
Assert.assertEquals(0L, stats.getNumEntries());
Assert.assertEquals(-1L, stats.getSizeInBytes());
cache.put(key1, val1);
stats = cache.getStats();
Assert.assertEquals(1L, stats.getNumEntries());
Assert.assertEquals(-1L, stats.getSizeInBytes());
cache.put(key2, val2);
stats = cache.getStats();
Assert.assertEquals(2L, stats.getNumEntries());
Assert.assertEquals(-1L, stats.getSizeInBytes());
}
@Test
public void testFromProperties()
{
final String keyPrefix = "cache.config.prefix";
final Properties properties = new Properties();
properties.put(keyPrefix + ".expireAfter", "10");
properties.put(keyPrefix + ".sizeInBytes", "100");
properties.put(keyPrefix + ".cacheExecutorFactory", "single_thread");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class);
}
)
);
final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class);
final JsonConfigProvider<CaffeineCacheConfig> caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of(
keyPrefix,
CaffeineCacheConfig.class
);
caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
Assert.assertEquals(10, config.getExpireAfter());
Assert.assertEquals(100, config.getSizeInBytes());
Assert.assertNotNull(config.createExecutor());
}
@Test
public void testMixedCaseFromProperties()
{
final String keyPrefix = "cache.config.prefix";
final Properties properties = new Properties();
properties.put(keyPrefix + ".expireAfter", "10");
properties.put(keyPrefix + ".sizeInBytes", "100");
properties.put(keyPrefix + ".cacheExecutorFactory", "CoMmON_FjP");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class);
}
)
);
final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class);
final JsonConfigProvider<CaffeineCacheConfig> caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of(
keyPrefix,
CaffeineCacheConfig.class
);
caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
Assert.assertEquals(10, config.getExpireAfter());
Assert.assertEquals(100, config.getSizeInBytes());
Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor());
}
@Test
public void testDefaultFromProperties()
{
final String keyPrefix = "cache.config.prefix";
final Properties properties = new Properties();
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class);
}
)
);
final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class);
final JsonConfigProvider<CaffeineCacheConfig> caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of(
keyPrefix,
CaffeineCacheConfig.class
);
caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
Assert.assertEquals(-1, config.getExpireAfter());
Assert.assertEquals(-1, config.getSizeInBytes());
Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor());
}
public int get(Cache cache, Cache.NamedKey key)
{
return Ints.fromByteArray(cache.get(key));
}
public void put(Cache cache, Cache.NamedKey key, Integer value)
{
cache.put(key, Ints.toByteArray(value));
}
// See
public static void forceRandomSeed(CaffeineCache cache) throws Exception
{
final Map map = cache.getCache().asMap();
final Method getFrequencySketch = map.getClass().getDeclaredMethod("frequencySketch");
getFrequencySketch.setAccessible(true);
final Object frequencySketch = getFrequencySketch.invoke(map);
final Field seedField = frequencySketch.getClass().getDeclaredField("randomSeed");
seedField.setAccessible(true);
seedField.setInt(frequencySketch, RANDOM_SEED);
}
}
class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider
{
private final CaffeineCacheConfig config;
@Inject
public CaffeineCacheProviderWithConfig(CaffeineCacheConfig config)
{
this.config = config;
}
@Override
public Cache get()
{
return CaffeineCache.create(config);
}
}

24
pom.xml
View File

@ -104,9 +104,6 @@
<module>extensions-contrib/distinctcount</module>
<module>extensions-contrib/parquet-extensions</module>
<module>extensions-contrib/statsd-emitter</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
<dependencyManagement>
@ -833,5 +830,26 @@
</pluginManagement>
</build>
</profile>
<profile>
<id>java8</id>
<activation>
<jdk>1.8</jdk>
</activation>
<modules>
<module>extensions-core/caffeine-cache</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
</profile>
<profile>
<id>java7</id>
<activation>
<jdk>1.7</jdk>
</activation>
<modules>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
</profile>
</profiles>
</project>