From 3f1681c16c456c64837b843b20b5e0d232da0047 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 6 Jul 2016 15:42:54 -0700 Subject: [PATCH] Caffeine cache extension (#3028) * Initial commit of caffeine cache * Address code comments * Move and fixup README.md a bit * Improve caffeine readme information * Cleanup caffeine pom * Address review comments * Bump caffeine to 2.3.1 * Bump druid version to 0.9.2-SNAPSHOT * Make test not fail randomly. See https://github.com/ben-manes/caffeine/pull/93#issuecomment-227617998 for an explanation * Fix distribution and documentation * Add caffeine to extensions.md * Fix links in extensions.md * Lexicographic --- distribution/pom.xml | 49 ++ .../extensions-core/caffeine-cache.md | 39 ++ docs/content/development/extensions.md | 1 + extensions-core/caffeine-cache/pom.xml | 80 +++ .../client/cache/CacheExecutorFactory.java | 58 ++ .../io/druid/client/cache/CaffeineCache.java | 192 +++++++ .../client/cache/CaffeineCacheConfig.java | 60 +++ .../client/cache/CaffeineCacheProvider.java | 32 ++ .../client/cache/CaffeineDruidModule.java | 47 ++ .../io.druid.initialization.DruidModule | 1 + .../druid/client/cache/CaffeineCacheTest.java | 500 ++++++++++++++++++ pom.xml | 24 +- 12 files changed, 1080 insertions(+), 3 deletions(-) create mode 100644 docs/content/development/extensions-core/caffeine-cache.md create mode 100644 extensions-core/caffeine-cache/pom.xml create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java create mode 100644 extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java create mode 100644 extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java diff --git a/distribution/pom.xml b/distribution/pom.xml index a1db657935a..20038bf9033 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -152,4 +152,53 @@ + + + java8 + + 1.8 + + + + + org.codehaus.mojo + exec-maven-plugin + + + pull-deps-jdk8 + package + + exec + + + java + + -classpath + + -Ddruid.extensions.loadList=[] + -Ddruid.extensions.directory=${project.build.directory}/extensions + + + -Ddruid.extensions.hadoopDependenciesDir=${project.build.directory}/hadoop-dependencies + + io.druid.cli.Main + tools + pull-deps + --defaultVersion + ${project.parent.version} + -l + ${settings.localRepository} + + --no-default-hadoop + -c + io.druid.extensions:druid-caffeine-cache + + + + + + + + + diff --git a/docs/content/development/extensions-core/caffeine-cache.md b/docs/content/development/extensions-core/caffeine-cache.md new file mode 100644 index 00000000000..b9d739904b2 --- /dev/null +++ b/docs/content/development/extensions-core/caffeine-cache.md @@ -0,0 +1,39 @@ +--- +layout: doc_page +--- + +Druid Caffeine Cache +-------------------- + +A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher + +# Configuration +Below are the configuration options known to this module: + +|`runtime.properties`|Description|Default| +|--------------------|-----------|-------| +|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)| +|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)| +|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)| +|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`| + +## `druid.cache.cacheExecutorFactory` + +Here are the possible values for `druid.cache.cacheExecutorFactory`, which controls how maintenance tasks are run + +* `COMMON_FJP` (default) use the common ForkJoinPool. Do NOT use this option unless you are running 8u60 or higher +* `SINGLE_THREAD` Use a single-threaded executor +* `SAME_THREAD` Cache maintenance is done eagerly + +# Enabling + +To enable the caffeine cache, include this module on the loadList and set `druid.cache.type` to `caffeine` in your properties. + +# Metrics +In addition to the normal cache metrics, the caffeine cache implementation also reports the following in both `total` and `delta` + +|Metric|Description|Normal value| +|------|-----------|------------| +|`query/cache/caffeine/*/requests`|Count of hits or misses|hit + miss| +|`query/cache/caffeine/*/loadTime`|Length of time caffeine spends loading new values (unused feature)|0| +|`query/cache/caffeine/*/evictionBytes`|Size in bytes that have been evicted from the cache|Varies, should tune cache `sizeInBytes` so that `sizeInBytes`/`evictionBytes` is approximately the rate of cache churn you desire| diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 14dbeb389cc..e95c397149d 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -22,6 +22,7 @@ Core extensions are maintained by Druid committers. |Name|Description|Docs| |----|-----------|----| |druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)| +|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)| |druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-aggregators.html)| |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)| |druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)| diff --git a/extensions-core/caffeine-cache/pom.xml b/extensions-core/caffeine-cache/pom.xml new file mode 100644 index 00000000000..5d3da168ca5 --- /dev/null +++ b/extensions-core/caffeine-cache/pom.xml @@ -0,0 +1,80 @@ + + + + + + 4.0.0 + + io.druid.extensions + druid-caffeine-cache + druid-caffeine-cache + Local cache implementation for Druid using Caffeine https://github.com/ben-manes/caffeine as the underlying implementation + + + io.druid + druid + 0.9.2-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + provided + ${project.parent.version} + + + io.druid + druid-server + ${project.parent.version} + provided + + + com.github.ben-manes.caffeine + caffeine + 2.3.1 + + + net.jpountz.lz4 + lz4 + provided + + + + + junit + junit + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java new file mode 100644 index 00000000000..a2c26159882 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CacheExecutorFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonCreator; +import io.druid.concurrent.Execs; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; + +public enum CacheExecutorFactory +{ + COMMON_FJP { + @Override + public Executor createExecutor() + { + return ForkJoinPool.commonPool(); + } + }, + SINGLE_THREAD { + @Override + public Executor createExecutor() + { + return Execs.singleThreaded("CaffeineWorker-%s"); + } + }, + SAME_THREAD { + @Override + public Executor createExecutor() + { + return Runnable::run; + } + }; + + public abstract Executor createExecutor(); + + @JsonCreator + public static CacheExecutorFactory from(String str) + { + return Enum.valueOf(CacheExecutorFactory.class, str.toUpperCase()); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java new file mode 100644 index 00000000000..5903183a287 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCache.java @@ -0,0 +1,192 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.primitives.Chars; +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class CaffeineCache implements io.druid.client.cache.Cache +{ + private static final Logger log = new Logger(CaffeineCache.class); + private static final int FIXED_COST = 8; // Minimum cost in "weight" per entry; + private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance(); + private static final LZ4FastDecompressor LZ4_DECOMPRESSOR = LZ4_FACTORY.fastDecompressor(); + private static final LZ4Compressor LZ4_COMPRESSOR = LZ4_FACTORY.fastCompressor(); + + private final Cache cache; + private final AtomicReference priorStats = new AtomicReference<>(CacheStats.empty()); + private final CaffeineCacheConfig config; + + + public static CaffeineCache create(final CaffeineCacheConfig config) + { + return create(config, config.createExecutor()); + } + + // Used in testing + public static CaffeineCache create(final CaffeineCacheConfig config, final Executor executor) + { + Caffeine builder = Caffeine.newBuilder().recordStats(); + if (config.getExpireAfter() >= 0) { + builder + .expireAfterAccess(config.getExpireAfter(), TimeUnit.MILLISECONDS); + } + if (config.getSizeInBytes() >= 0) { + builder + .maximumWeight(config.getSizeInBytes()) + .weigher((NamedKey key, byte[] value) -> value.length + + key.key.length + + key.namespace.length() * Chars.BYTES + + FIXED_COST); + } + builder.executor(executor); + return new CaffeineCache(builder.build(), config); + } + + private CaffeineCache(final Cache cache, CaffeineCacheConfig config) + { + this.cache = cache; + this.config = config; + } + + @Override + public byte[] get(NamedKey key) + { + return deserialize(cache.getIfPresent(key)); + } + + @Override + public void put(NamedKey key, byte[] value) + { + cache.put(key, serialize(value)); + } + + @Override + public Map getBulk(Iterable keys) + { + // The assumption here is that every value is accessed at least once. Materializing here ensures deserialize is only + // called *once* per value. + return ImmutableMap.copyOf(Maps.transformValues(cache.getAllPresent(keys), this::deserialize)); + } + + // This is completely racy with put. Any values missed should be evicted later anyways. So no worries. + @Override + public void close(String namespace) + { + if (config.isEvictOnClose()) { + cache.asMap().keySet().removeIf(key -> key.namespace.equals(namespace)); + } + } + + @Override + public io.druid.client.cache.CacheStats getStats() + { + final com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats(); + final long size = cache + .policy().eviction() + .map(eviction -> eviction.isWeighted() ? eviction.weightedSize() : OptionalLong.empty()) + .orElse(OptionalLong.empty()).orElse(-1); + return new io.druid.client.cache.CacheStats( + stats.hitCount(), + stats.missCount(), + cache.estimatedSize(), + size, + stats.evictionCount(), + 0, + stats.loadFailureCount() + ); + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + final CacheStats oldStats = priorStats.get(); + final CacheStats newStats = cache.stats(); + final CacheStats deltaStats = newStats.minus(oldStats); + + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + emitter.emit(builder.build("query/cache/caffeine/delta/requests", deltaStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/total/requests", newStats.requestCount())); + emitter.emit(builder.build("query/cache/caffeine/delta/loadTime", deltaStats.totalLoadTime())); + emitter.emit(builder.build("query/cache/caffeine/total/loadTime", newStats.totalLoadTime())); + emitter.emit(builder.build("query/cache/caffeine/delta/evictionBytes", deltaStats.evictionWeight())); + emitter.emit(builder.build("query/cache/caffeine/total/evictionBytes", newStats.evictionWeight())); + if (!priorStats.compareAndSet(oldStats, newStats)) { + // ISE for stack trace + log.warn( + new IllegalStateException("Multiple monitors"), + "Multiple monitors on the same cache causing race conditions and unreliable stats reporting" + ); + } + } + + @VisibleForTesting + Cache getCache() + { + return cache; + } + + private byte[] deserialize(byte[] bytes) + { + if (bytes == null) { + return null; + } + final int decompressedLen = ByteBuffer.wrap(bytes).getInt(); + final byte[] out = new byte[decompressedLen]; + LZ4_DECOMPRESSOR.decompress(bytes, Ints.BYTES, out, 0, out.length); + return out; + } + + private byte[] serialize(byte[] value) + { + final int len = LZ4_COMPRESSOR.maxCompressedLength(value.length); + final byte[] out = new byte[len]; + final int compressedSize = LZ4_COMPRESSOR.compress(value, 0, value.length, out, 0); + return ByteBuffer.allocate(compressedSize + Ints.BYTES) + .putInt(value.length) + .put(out, 0, compressedSize) + .array(); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java new file mode 100644 index 00000000000..50c5ed2875f --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.concurrent.Executor; + +public class CaffeineCacheConfig +{ + @JsonProperty + private long expireAfter = -1; + + @JsonProperty + private long sizeInBytes = -1; + + @JsonProperty + // Do not use COMMON_FJP unless you're running 8u60 or higher + // see https://github.com/ben-manes/caffeine/issues/77 + private CacheExecutorFactory cacheExecutorFactory = CacheExecutorFactory.COMMON_FJP; + + @JsonProperty + private boolean evictOnClose = false; + + public long getExpireAfter() + { + return expireAfter; + } + + public long getSizeInBytes() + { + return sizeInBytes; + } + + public Executor createExecutor() + { + return cacheExecutorFactory.createExecutor(); + } + + public boolean isEvictOnClose() + { + return evictOnClose; + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java new file mode 100644 index 00000000000..23e18987fe7 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineCacheProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("caffeine") +public class CaffeineCacheProvider extends CaffeineCacheConfig implements CacheProvider +{ + @Override + public Cache get() + { + return CaffeineCache.create(this); + } +} diff --git a/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java new file mode 100644 index 00000000000..20adb814bc4 --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/java/io/druid/client/cache/CaffeineDruidModule.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class CaffeineDruidModule implements DruidModule +{ + + @Override + public void configure(Binder binder) + { + + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("DruidCaffeineCache") + .registerSubtypes(CaffeineCacheProvider.class) + ); + } +} diff --git a/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..016f13528ad --- /dev/null +++ b/extensions-core/caffeine-cache/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.client.cache.CaffeineDruidModule diff --git a/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java new file mode 100644 index 00000000000..373bea20ef3 --- /dev/null +++ b/extensions-core/caffeine-cache/src/test/java/io/druid/client/cache/CaffeineCacheTest.java @@ -0,0 +1,500 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigurator; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ForkJoinPool; + +public class CaffeineCacheTest +{ + private static final int RANDOM_SEED = 3478178; + private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); + private static final byte[] HO = "hooooooooooooooooooo".getBytes(); + + private CaffeineCache cache; + private final CaffeineCacheConfig cacheConfig = new CaffeineCacheConfig() + { + @Override + public boolean isEvictOnClose() + { + return true; + } + }; + + @Before + public void setUp() throws Exception + { + cache = CaffeineCache.create(cacheConfig); + } + + @Test + public void testBasicInjection() throws Exception + { + final CaffeineCacheConfig config = new CaffeineCacheConfig(); + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(CaffeineCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(CaffeineCacheProviderWithConfig.class).in(ManageLifecycle.class); + } + ) + ); + final Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(CaffeineCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testSimpleInjection() + { + final String uuid = UUID.randomUUID().toString(); + System.setProperty(uuid + ".type", "caffeine"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + + binder.bind(Cache.class).toProvider(CacheProvider.class); + JsonConfigProvider.bind(binder, uuid, CacheProvider.class); + } + ) + ); + final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class); + Assert.assertNotNull(cacheProvider); + Assert.assertEquals(CaffeineCacheProvider.class, cacheProvider.getClass()); + } + + @Test + public void testBaseOps() throws Exception + { + final Cache.NamedKey aKey = new Cache.NamedKey("a", HI); + Assert.assertNull(cache.get(aKey)); + put(cache, aKey, 1); + Assert.assertEquals(1, get(cache, aKey)); + + cache.close("a"); + Assert.assertNull(cache.get(aKey)); + + final Cache.NamedKey hiKey = new Cache.NamedKey("the", HI); + final Cache.NamedKey hoKey = new Cache.NamedKey("the", HO); + put(cache, hiKey, 10); + put(cache, hoKey, 20); + Assert.assertEquals(10, get(cache, hiKey)); + Assert.assertEquals(20, get(cache, hoKey)); + cache.close("the"); + + Assert.assertNull(cache.get(hiKey)); + Assert.assertNull(cache.get(hoKey)); + + Assert.assertNull(cache.get(new Cache.NamedKey("miss", HI))); + + final CacheStats stats = cache.getStats(); + Assert.assertEquals(3, stats.getNumHits()); + Assert.assertEquals(5, stats.getNumMisses()); + } + + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); + + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + put(cache, key1, 2); + + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); + put(cache, key2, 10); + + Map result = cache.getBulk( + Lists.newArrayList( + key1, + key2 + ) + ); + + Assert.assertEquals(2, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + + Cache.NamedKey missingKey = new Cache.NamedKey("missing", HI); + result = cache.getBulk(Lists.newArrayList(missingKey)); + Assert.assertEquals(result.size(), 0); + + result = cache.getBulk(Lists.newArrayList()); + Assert.assertEquals(result.size(), 0); + } + + @Test + public void testSizeEviction() throws Exception + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 40; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final CaffeineCache cache = CaffeineCache.create(config, Runnable::run); + forceRandomSeed(cache); + + Assert.assertNull(cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + cache.put(key1, val1); + Assert.assertArrayEquals(val1, cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + Assert.assertEquals(0, cache.getCache().stats().evictionWeight()); + + Assert.assertArrayEquals(val1, cache.get(key1)); + Assert.assertNull(cache.get(key2)); + + cache.put(key2, val2); + Assert.assertNull(cache.get(key1)); + Assert.assertArrayEquals(val2, cache.get(key2)); + Assert.assertEquals(34, cache.getCache().stats().evictionWeight()); + } + + @Test + public void testSizeCalculation() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 40; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + } + + @Test + public void testSizeCalculationAfterDelete() + { + final String namespace = "the"; + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 999999; + } + + @Override + public boolean isEvictOnClose() + { + return true; + } + + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey(namespace, s1); + final Cache.NamedKey key2 = new Cache.NamedKey(namespace, s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(68L, stats.getSizeInBytes()); + + cache.close(namespace); + stats = cache.getStats(); + Assert.assertEquals(0, stats.getNumEntries()); + Assert.assertEquals(0, stats.getSizeInBytes()); + } + + + @Test + public void testSizeCalculationMore() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return 400; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final Cache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(0L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(34L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(68L, stats.getSizeInBytes()); + } + + @Test + public void testSizeCalculationNoWeight() + { + final CaffeineCacheConfig config = new CaffeineCacheConfig() + { + @Override + public long getSizeInBytes() + { + return -1; + } + }; + final Random random = new Random(843671346794319L); + final byte[] val1 = new byte[14], val2 = new byte[14]; + final byte[] s1 = new byte[]{0x01}, s2 = new byte[]{0x02}; + random.nextBytes(val1); + random.nextBytes(val2); + final Cache.NamedKey key1 = new Cache.NamedKey("the", s1); + final Cache.NamedKey key2 = new Cache.NamedKey("the", s2); + final CaffeineCache cache = CaffeineCache.create(config, Runnable::run); + + CacheStats stats = cache.getStats(); + Assert.assertEquals(0L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + + cache.put(key1, val1); + + stats = cache.getStats(); + Assert.assertEquals(1L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + + cache.put(key2, val2); + + stats = cache.getStats(); + Assert.assertEquals(2L, stats.getNumEntries()); + Assert.assertEquals(-1L, stats.getSizeInBytes()); + } + + @Test + public void testFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + properties.put(keyPrefix + ".expireAfter", "10"); + properties.put(keyPrefix + ".sizeInBytes", "100"); + properties.put(keyPrefix + ".cacheExecutorFactory", "single_thread"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(10, config.getExpireAfter()); + Assert.assertEquals(100, config.getSizeInBytes()); + Assert.assertNotNull(config.createExecutor()); + } + + @Test + public void testMixedCaseFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + properties.put(keyPrefix + ".expireAfter", "10"); + properties.put(keyPrefix + ".sizeInBytes", "100"); + properties.put(keyPrefix + ".cacheExecutorFactory", "CoMmON_FjP"); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(10, config.getExpireAfter()); + Assert.assertEquals(100, config.getSizeInBytes()); + Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor()); + } + + @Test + public void testDefaultFromProperties() + { + final String keyPrefix = "cache.config.prefix"; + final Properties properties = new Properties(); + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, keyPrefix, CaffeineCacheConfig.class); + } + ) + ); + final JsonConfigurator configurator = injector.getInstance(JsonConfigurator.class); + final JsonConfigProvider caffeineCacheConfigJsonConfigProvider = JsonConfigProvider.of( + keyPrefix, + CaffeineCacheConfig.class + ); + caffeineCacheConfigJsonConfigProvider.inject(properties, configurator); + final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get(); + Assert.assertEquals(-1, config.getExpireAfter()); + Assert.assertEquals(-1, config.getSizeInBytes()); + Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor()); + } + + public int get(Cache cache, Cache.NamedKey key) + { + return Ints.fromByteArray(cache.get(key)); + } + + public void put(Cache cache, Cache.NamedKey key, Integer value) + { + cache.put(key, Ints.toByteArray(value)); + } + + // See + public static void forceRandomSeed(CaffeineCache cache) throws Exception + { + final Map map = cache.getCache().asMap(); + final Method getFrequencySketch = map.getClass().getDeclaredMethod("frequencySketch"); + getFrequencySketch.setAccessible(true); + final Object frequencySketch = getFrequencySketch.invoke(map); + final Field seedField = frequencySketch.getClass().getDeclaredField("randomSeed"); + seedField.setAccessible(true); + seedField.setInt(frequencySketch, RANDOM_SEED); + } +} + +class CaffeineCacheProviderWithConfig extends CaffeineCacheProvider +{ + private final CaffeineCacheConfig config; + + @Inject + public CaffeineCacheProviderWithConfig(CaffeineCacheConfig config) + { + this.config = config; + } + + @Override + public Cache get() + { + return CaffeineCache.create(config); + } +} diff --git a/pom.xml b/pom.xml index 8dd4f44860b..d7d2f5a37c4 100644 --- a/pom.xml +++ b/pom.xml @@ -104,9 +104,6 @@ extensions-contrib/distinctcount extensions-contrib/parquet-extensions extensions-contrib/statsd-emitter - - - distribution @@ -833,5 +830,26 @@ + + java8 + + 1.8 + + + extensions-core/caffeine-cache + + distribution + + + + java7 + + 1.7 + + + + distribution + +