diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 2d442bff848..36953966a30 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -113,8 +113,9 @@ You can optionally only configure caching to be enabled on the broker by setting |`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false| |`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false| |`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| -|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| +|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`| |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`| +|`druid.broker.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)| See [cache configuration](caching.html) for how to configure cache settings. diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 02dddb92e52..ae770eadaf3 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -97,7 +97,8 @@ You can optionally only configure caching to be enabled on the historical by set |--------|---------------|-----------|-------| |`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false| |`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false| -|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| +|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["select"]| +|`druid.historical.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)| See [cache configuration](caching.html) for how to configure cache settings. diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index d4ca2dc3079..3d9f2622afe 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -353,16 +353,21 @@ You can enable caching of results at the broker, historical, or realtime level u |druid.(broker|historical|realtime).cache.unCacheable|All druid query types|All query types to not cache.|["groupBy", "select"]| |druid.(broker|historical|realtime).cache.useCache|true, false|Whether to use cache for getting query results.|false| |druid.(broker|historical|realtime).cache.populateCache|true, false|Whether to populate cache.|false| +|druid.(broker|historical|realtime).cache.maxEntrySize|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`-1`| #### Local Cache +
+DEPRECATED: Use caffeine instead +
+ |Property|Description|Default| |--------|-----------|-------| |`druid.cache.sizeInBytes`|Maximum cache size in bytes. You must set this if you enabled populateCache/useCache, or else cache size of zero wouldn't really cache anything.|0| |`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000| |`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0| -#### Memcache +#### Memcached |Property|Description|Default| |--------|-----------|-------| @@ -372,6 +377,22 @@ You can enable caching of results at the broker, historical, or realtime level u |`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)| |`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid| +#### Caffeine Cache + +A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher if using `COMMON_FJP`. + +Below are the configuration options known to this module: + +|`runtime.properties`|Description|Default| +|--------------------|-----------|-------| +|`druid.cache.type`| Set this to `caffeine`|`local`| +|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)| +|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)| +|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)| +|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`| + +See the [Caching documentation](caching.html) for more detail. + ### Indexing Service Discovery This config is used to find the [Indexing Service](../design/indexing-service.html) using Curator service discovery. Only required if you are actually running an indexing service. diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md index 80fe9553fef..b75fca5879a 100644 --- a/docs/content/configuration/realtime.md +++ b/docs/content/configuration/realtime.md @@ -72,6 +72,7 @@ You can optionally configure caching to be enabled on the realtime node by setti |--------|---------------|-----------|-------| |`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false| |`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false| -|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| +|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`| +|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)| See [cache configuration](caching.html) for how to configure cache settings. diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index d2c802d0444..6dc3e0f9857 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -88,6 +88,9 @@ Available Metrics |`*/averageByte`|Average cache entry byte size.||Varies.| |`*/timeouts`|Number of cache timeouts.||0| |`*/errors`|Number of cache errors.||0| +|`*/put/ok`|Number of new cache entries successfully cached.||Varies, but more than zero.| +|`*/put/error`|Number of new cache entries that could not be cached due to errors.||Varies, but more than zero.| +|`*/put/oversized`|Number of potential new cache entries that were skipped due to being too large (based on `druid.{broker,historical,realtime}.cache.maxEntrySize` properties).||Varies.| #### Memcached only metrics diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index a9d5c98197e..8cb63239757 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -247,7 +247,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler toolbox.getEmitter(), toolbox.getQueryExecutorService(), toolbox.getCache(), - toolbox.getCacheConfig() + toolbox.getCacheConfig(), + toolbox.getCachePopulatorStats() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 1e30a950d3c..1929bb45132 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.FloatDimensionSchema; @@ -2147,6 +2148,7 @@ public class KafkaIndexTaskTest testUtils.getTestIndexIO(), MapCache.create(1024), new CacheConfig(), + new CachePopulatorStats(), testUtils.getTestIndexMergerV9(), EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index d63c2a1c641..e31969e2bae 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -27,6 +27,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.inject.Provider; +import io.druid.client.cache.CachePopulatorStats; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.Cache; @@ -89,6 +90,7 @@ public class TaskToolbox private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; private final IndexMergerV9 indexMergerV9; private final TaskReportFileWriter taskReportFileWriter; @@ -117,6 +119,7 @@ public class TaskToolbox IndexIO indexIO, Cache cache, CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats, IndexMergerV9 indexMergerV9, DruidNodeAnnouncer druidNodeAnnouncer, DruidNode druidNode, @@ -144,6 +147,7 @@ public class TaskToolbox this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; + this.cachePopulatorStats = cachePopulatorStats; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.druidNodeAnnouncer = druidNodeAnnouncer; this.druidNode = druidNode; @@ -268,6 +272,11 @@ public class TaskToolbox return cacheConfig; } + public CachePopulatorStats getCachePopulatorStats() + { + return cachePopulatorStats; + } + public IndexMergerV9 getIndexMergerV9() { return indexMergerV9; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 95f1b3238cb..f858761ad40 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -23,10 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Provider; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.discovery.DataNodeService; import io.druid.discovery.DruidNodeAnnouncer; import io.druid.discovery.LookupNodeService; @@ -35,6 +34,8 @@ import io.druid.guice.annotations.RemoteChatHandler; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.MonitorScheduler; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; @@ -73,6 +74,7 @@ public class TaskToolboxFactory private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; private final IndexMergerV9 indexMergerV9; private final DruidNodeAnnouncer druidNodeAnnouncer; private final DruidNode druidNode; @@ -100,6 +102,7 @@ public class TaskToolboxFactory IndexIO indexIO, Cache cache, CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats, IndexMergerV9 indexMergerV9, DruidNodeAnnouncer druidNodeAnnouncer, @RemoteChatHandler DruidNode druidNode, @@ -126,6 +129,7 @@ public class TaskToolboxFactory this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; + this.cachePopulatorStats = cachePopulatorStats; this.indexMergerV9 = indexMergerV9; this.druidNodeAnnouncer = druidNodeAnnouncer; this.druidNode = druidNode; @@ -157,6 +161,7 @@ public class TaskToolboxFactory indexIO, cache, cacheConfig, + cachePopulatorStats, indexMergerV9, druidNodeAnnouncer, druidNode, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 6c367fd5211..c1ee50b23e9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -706,7 +706,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements toolbox.getEmitter(), toolbox.getQueryExecutorService(), toolbox.getCache(), - toolbox.getCacheConfig() + toolbox.getCacheConfig(), + toolbox.getCachePopulatorStats() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 37a7ccf1139..a496182b717 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getIndexIO(), toolbox.getCache(), toolbox.getCacheConfig(), + toolbox.getCachePopulatorStats(), toolbox.getObjectMapper() ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index d557e8cba96..abd088f1400 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.NoopTestTaskFileWriter; @@ -111,6 +112,7 @@ public class TaskToolboxTest mockIndexIO, mockCache, mockCacheConfig, + new CachePopulatorStats(), mockIndexMergerV9, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 9f4073bc4d3..9166265398e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.common.config.NullHandling; import io.druid.data.input.Firehose; @@ -1500,6 +1501,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest testUtils.getTestIndexIO(), MapCache.create(1024), new CacheConfig(), + new CachePopulatorStats(), testUtils.getTestIndexMergerV9(), EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 99adee034b3..2288393bbe8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -586,6 +586,7 @@ public class CompactionTaskTest indexIO, null, null, + null, new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index c05d2dc41dc..b15261c9eee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1514,6 +1514,7 @@ public class IndexTaskTest indexIO, null, null, + null, indexMergerV9, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index d3edfef86dd..36fe3a3a63d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.common.config.NullHandling; import io.druid.data.input.Firehose; @@ -1080,6 +1081,7 @@ public class RealtimeIndexTaskTest testUtils.getTestIndexIO(), MapCache.create(1024), new CacheConfig(), + new CachePopulatorStats(), testUtils.getTestIndexMergerV9(), EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 2e816f546bc..900f48e5164 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -252,6 +252,7 @@ public class SameIntervalMergeTaskTest indexIO, null, null, + null, EasyMock.createMock(IndexMergerV9.class), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index f2085eaeb22..60ef6932740 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -316,6 +316,7 @@ public class IngestSegmentFirehoseFactoryTest INDEX_IO, null, null, + null, INDEX_MERGER_V9, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index c8149e32d9d..469d4a867e4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -340,6 +340,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest INDEX_IO, null, null, + null, INDEX_MERGER_V9, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 9a95b879515..a5cd1251213 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -99,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest utils.getTestIndexIO(), null, null, + null, utils.getTestIndexMergerV9(), null, node, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 615fe39cec6..dc41b869e37 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -616,6 +617,7 @@ public class TaskLifecycleTest INDEX_IO, MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG, + new CachePopulatorStats(), INDEX_MERGER_V9, EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java index cd7e85e4fce..ff528ea8d82 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java @@ -125,6 +125,7 @@ public class WorkerTaskManagerTest indexIO, null, null, + null, indexMergerV9, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 2b5374b1030..1af92ada5c6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -187,6 +187,7 @@ public class WorkerTaskMonitorTest indexIO, null, null, + null, indexMergerV9, null, null, diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 78ba5e8ac84..622ec0c8db9 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -19,9 +19,6 @@ package io.druid.client; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.java.util.common.StringUtils; @@ -31,8 +28,6 @@ import io.druid.query.QueryContexts; import io.druid.query.SegmentDescriptor; import org.joda.time.Interval; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; public class CacheUtil @@ -57,23 +52,6 @@ public class CacheUtil ); } - public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable results) - { - try { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (JsonGenerator gen = mapper.getFactory().createGenerator(bytes)) { - for (Object result : results) { - gen.writeObject(result); - } - } - - cache.put(key, bytes.toByteArray()); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - public static boolean useCacheOnBrokers( Query query, CacheStrategy> strategy, diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 50f6389e2d4..d227dd4ace2 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -32,17 +32,12 @@ import com.google.common.collect.RangeSet; import com.google.common.collect.Sets; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulator; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; -import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; @@ -88,8 +83,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -102,8 +95,8 @@ public class CachingClusteredClient implements QuerySegmentWalker private final TimelineServerView serverView; private final Cache cache; private final ObjectMapper objectMapper; + private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; - private final ListeningExecutorService backgroundExecutorService; @Inject public CachingClusteredClient( @@ -111,7 +104,7 @@ public class CachingClusteredClient implements QuerySegmentWalker TimelineServerView serverView, Cache cache, @Smile ObjectMapper objectMapper, - @BackgroundCaching ExecutorService backgroundExecutorService, + CachePopulator cachePopulator, CacheConfig cacheConfig ) { @@ -119,13 +112,13 @@ public class CachingClusteredClient implements QuerySegmentWalker this.serverView = serverView; this.cache = cache; this.objectMapper = objectMapper; + this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; - this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); - if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) { + if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( - "Even though groupBy caching is enabled, v2 groupBys will not be cached. " - + "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching." + "Even though groupBy caching is enabled in your configuration, v2 groupBys will not be cached on the broker. " + + "Consider enabling caching on your data nodes if it is not already enabled." ); } @@ -218,7 +211,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final boolean isBySegment; private final int uncoveredIntervalsLimit; private final Query downstreamQuery; - private final Map cachePopulatorMap = Maps.newHashMap(); + private final Map cachePopulatorKeyMap = Maps.newHashMap(); SpecificQueryRunnable(final QueryPlus queryPlus, final Map responseContext) { @@ -420,7 +413,7 @@ public class CachingClusteredClient implements QuerySegmentWalker } else if (populateCache) { // otherwise, if populating cache, add segment to list of segments to cache final String segmentIdentifier = segment.getServer().getSegment().getIdentifier(); - addCachePopulator(segmentCacheKey, segmentIdentifier, segmentQueryInterval); + addCachePopulatorKey(segmentCacheKey, segmentIdentifier, segmentQueryInterval); } }); return alreadyCachedResults; @@ -453,22 +446,22 @@ public class CachingClusteredClient implements QuerySegmentWalker } } - private void addCachePopulator( + private void addCachePopulatorKey( Cache.NamedKey segmentCacheKey, String segmentIdentifier, Interval segmentQueryInterval ) { - cachePopulatorMap.put( + cachePopulatorKeyMap.put( StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval), - new CachePopulator(cache, objectMapper, segmentCacheKey) + segmentCacheKey ); } @Nullable - private CachePopulator getCachePopulator(String segmentId, Interval segmentInterval) + private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentInterval) { - return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval)); + return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval)); } private SortedMap> groupSegmentsByServer(Set segments) @@ -601,27 +594,19 @@ public class CachingClusteredClient implements QuerySegmentWalker responseContext ); final Function cacheFn = strategy.prepareForSegmentLevelCache(); + return resultsBySegments .map(result -> { final BySegmentResultValueClass resultsOfSegment = result.getValue(); - final CachePopulator cachePopulator = - getCachePopulator(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval()); - Sequence res = Sequences - .simple(resultsOfSegment.getResults()) - .map(r -> { - if (cachePopulator != null) { - // only compute cache data if populating cache - cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() -> cacheFn.apply(r))); - } - return r; - }) - .map( - toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply - ); - if (cachePopulator != null) { - res = res.withEffect(cachePopulator::populate, MoreExecutors.sameThreadExecutor()); + final Cache.NamedKey cachePopulatorKey = + getCachePopulatorKey(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval()); + Sequence res = Sequences.simple(resultsOfSegment.getResults()); + if (cachePopulatorKey != null) { + res = cachePopulator.wrap(res, cacheFn::apply, cache, cachePopulatorKey); } - return res; + return res.map( + toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply + ); }) .flatMerge(seq -> seq, query.getResultOrdering()); } @@ -644,43 +629,4 @@ public class CachingClusteredClient implements QuerySegmentWalker return rhs; } } - - private class CachePopulator - { - private final Cache cache; - private final ObjectMapper mapper; - private final Cache.NamedKey key; - private final ConcurrentLinkedQueue> cacheFutures = new ConcurrentLinkedQueue<>(); - - CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key) - { - this.cache = cache; - this.mapper = mapper; - this.key = key; - } - - public void populate() - { - Futures.addCallback( - Futures.allAsList(cacheFutures), - new FutureCallback>() - { - @Override - public void onSuccess(List cacheData) - { - CacheUtil.populate(cache, mapper, key, cacheData); - // Help out GC by making sure all references are gone - cacheFutures.clear(); - } - - @Override - public void onFailure(Throwable throwable) - { - log.error(throwable, "Background caching failed"); - } - }, - backgroundExecutorService - ); - } - } } diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index df9e8856709..422ea651578 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -23,18 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.java.util.common.logger.Logger; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryPlus; @@ -46,20 +41,19 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; -import java.util.concurrent.ExecutorService; public class CachingQueryRunner implements QueryRunner { - private static final Logger log = new Logger(CachingQueryRunner.class); private final String segmentIdentifier; private final SegmentDescriptor segmentDescriptor; private final QueryRunner base; private final QueryToolChest toolChest; private final Cache cache; private final ObjectMapper mapper; + private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; - private final ListeningExecutorService backgroundExecutorService; public CachingQueryRunner( String segmentIdentifier, @@ -68,7 +62,7 @@ public class CachingQueryRunner implements QueryRunner Cache cache, QueryToolChest toolchest, QueryRunner base, - ExecutorService backgroundExecutorService, + CachePopulator cachePopulator, CacheConfig cacheConfig ) { @@ -78,7 +72,7 @@ public class CachingQueryRunner implements QueryRunner this.toolChest = toolchest; this.cache = cache; this.mapper = mapper; - this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); + this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; } @@ -140,56 +134,10 @@ public class CachingQueryRunner implements QueryRunner } } - final Collection> cacheFutures = Collections.synchronizedList(Lists.newLinkedList()); + final Collection> cacheFutures = Collections.synchronizedList(new LinkedList<>()); if (populateCache) { final Function cacheFn = strategy.prepareForSegmentLevelCache(); - - return Sequences.withEffect( - Sequences.map( - base.run(queryPlus, responseContext), - new Function() - { - @Override - public T apply(final T input) - { - final SettableFuture future = SettableFuture.create(); - cacheFutures.add(future); - backgroundExecutorService.submit( - new Runnable() - { - @Override - public void run() - { - try { - future.set(cacheFn.apply(input)); - } - catch (Exception e) { - // if there is exception, should setException to quit the caching processing - future.setException(e); - } - } - } - ); - return input; - } - } - ), - new Runnable() - { - @Override - public void run() - { - try { - CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get()); - } - catch (Exception e) { - log.error(e, "Error while getting future for cache task"); - throw Throwables.propagate(e); - } - } - }, - backgroundExecutorService - ); + return cachePopulator.wrap(base.run(queryPlus, responseContext), value -> cacheFn.apply(value), cache, key); } else { return base.run(queryPlus, responseContext); } diff --git a/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java new file mode 100644 index 00000000000..d20fcf4f7e6 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/BackgroundCachePopulator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; + +import java.io.ByteArrayOutputStream; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +public class BackgroundCachePopulator implements CachePopulator +{ + private static final Logger log = new Logger(BackgroundCachePopulator.class); + + private final ListeningExecutorService exec; + private final ObjectMapper objectMapper; + private final CachePopulatorStats cachePopulatorStats; + private final long maxEntrySize; + + public BackgroundCachePopulator( + final ExecutorService exec, + final ObjectMapper objectMapper, + final CachePopulatorStats cachePopulatorStats, + final long maxEntrySize + ) + { + this.exec = MoreExecutors.listeningDecorator(exec); + this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); + this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); + this.maxEntrySize = maxEntrySize; + } + + @Override + public Sequence wrap( + final Sequence sequence, + final Function cacheFn, + final Cache cache, + final Cache.NamedKey cacheKey + ) + { + final List> cacheFutures = new LinkedList<>(); + + final Sequence wrappedSequence = Sequences.map( + sequence, + input -> { + cacheFutures.add(exec.submit(() -> cacheFn.apply(input))); + return input; + } + ); + + return Sequences.withEffect( + wrappedSequence, + () -> { + Futures.addCallback( + Futures.allAsList(cacheFutures), + new FutureCallback>() + { + @Override + public void onSuccess(List results) + { + populateCache(cache, cacheKey, results); + // Help out GC by making sure all references are gone + cacheFutures.clear(); + } + + @Override + public void onFailure(Throwable t) + { + log.error(t, "Background caching failed"); + } + }, + exec + ); + }, + MoreExecutors.sameThreadExecutor() + ); + } + + private void populateCache( + final Cache cache, + final Cache.NamedKey cacheKey, + final List results + ) + { + try { + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) { + for (CacheType result : results) { + gen.writeObject(result); + + if (maxEntrySize > 0 && bytes.size() > maxEntrySize) { + cachePopulatorStats.incrementOversized(); + return; + } + } + } + + if (maxEntrySize > 0 && bytes.size() > maxEntrySize) { + cachePopulatorStats.incrementOversized(); + return; + } + + cache.put(cacheKey, bytes.toByteArray()); + cachePopulatorStats.incrementOk(); + } + catch (Exception e) { + log.warn(e, "Could not populate cache"); + cachePopulatorStats.incrementError(); + } + } +} diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index 4b9f02033ca..7d8e35fd7c3 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -20,10 +20,10 @@ package io.druid.client.cache; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.druid.query.Query; import javax.validation.constraints.Min; -import java.util.Arrays; import java.util.List; public class CacheConfig @@ -52,10 +52,13 @@ public class CacheConfig private int cacheBulkMergeLimit = Integer.MAX_VALUE; @JsonProperty - private int resultLevelCacheLimit = Integer.MAX_VALUE; + private int maxEntrySize = 1_000_000; @JsonProperty - private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); + private List unCacheable = ImmutableList.of(Query.SELECT); + + @JsonProperty + private int resultLevelCacheLimit = Integer.MAX_VALUE; public boolean isPopulateCache() { @@ -87,6 +90,11 @@ public class CacheConfig return cacheBulkMergeLimit; } + public int getMaxEntrySize() + { + return maxEntrySize; + } + public int getResultLevelCacheLimit() { return resultLevelCacheLimit; diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java b/server/src/main/java/io/druid/client/cache/CacheMonitor.java index 515555abee2..576f9dbfd2d 100644 --- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java +++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java @@ -20,27 +20,24 @@ package io.druid.client.cache; import com.google.inject.Inject; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; import io.druid.java.util.metrics.AbstractMonitor; -import io.druid.java.util.common.StringUtils; public class CacheMonitor extends AbstractMonitor { // package private for tests volatile Cache cache; + private final CachePopulatorStats cachePopulatorStats; private volatile CacheStats prevCacheStats = null; + private volatile CachePopulatorStats.Snapshot prevCachePopulatorStats = null; - public CacheMonitor() + @Inject + public CacheMonitor(final CachePopulatorStats cachePopulatorStats) { - } - - public CacheMonitor( - Cache cache - ) - { - this.cache = cache; + this.cachePopulatorStats = cachePopulatorStats; } // make it possible to enable CacheMonitor even if cache is not bound @@ -58,10 +55,16 @@ public class CacheMonitor extends AbstractMonitor final CacheStats currCacheStats = cache.getStats(); final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats); - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - emitStats(emitter, "query/cache/delta", deltaCacheStats, builder); - emitStats(emitter, "query/cache/total", currCacheStats, builder); + final CachePopulatorStats.Snapshot currCachePopulatorStats = cachePopulatorStats.snapshot(); + final CachePopulatorStats.Snapshot deltaCachePopulatorStats = currCachePopulatorStats.delta( + prevCachePopulatorStats + ); + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + emitStats(emitter, "query/cache/delta", deltaCachePopulatorStats, deltaCacheStats, builder); + emitStats(emitter, "query/cache/total", currCachePopulatorStats, currCacheStats, builder); + + prevCachePopulatorStats = currCachePopulatorStats; prevCacheStats = currCacheStats; // Any custom cache statistics that need monitoring @@ -71,13 +74,15 @@ public class CacheMonitor extends AbstractMonitor } private void emitStats( - ServiceEmitter emitter, + final ServiceEmitter emitter, final String metricPrefix, - CacheStats cacheStats, - ServiceMetricEvent.Builder builder + final CachePopulatorStats.Snapshot cachePopulatorStats, + final CacheStats cacheStats, + final ServiceMetricEvent.Builder builder ) { if (cache != null) { + // Cache stats. emitter.emit(builder.build(StringUtils.format("%s/numEntries", metricPrefix), cacheStats.getNumEntries())); emitter.emit(builder.build(StringUtils.format("%s/sizeBytes", metricPrefix), cacheStats.getSizeInBytes())); emitter.emit(builder.build(StringUtils.format("%s/hits", metricPrefix), cacheStats.getNumHits())); @@ -87,6 +92,13 @@ public class CacheMonitor extends AbstractMonitor emitter.emit(builder.build(StringUtils.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes())); emitter.emit(builder.build(StringUtils.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts())); emitter.emit(builder.build(StringUtils.format("%s/errors", metricPrefix), cacheStats.getNumErrors())); + + // Cache populator stats. + emitter.emit(builder.build(StringUtils.format("%s/put/ok", metricPrefix), cachePopulatorStats.getNumOk())); + emitter.emit(builder.build(StringUtils.format("%s/put/error", metricPrefix), cachePopulatorStats.getNumError())); + emitter.emit( + builder.build(StringUtils.format("%s/put/oversized", metricPrefix), cachePopulatorStats.getNumOversized()) + ); } } } diff --git a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java b/server/src/main/java/io/druid/client/cache/CachePopulator.java similarity index 66% rename from processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java rename to server/src/main/java/io/druid/client/cache/CachePopulator.java index abd75149b76..5c8fc88776f 100644 --- a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java +++ b/server/src/main/java/io/druid/client/cache/CachePopulator.java @@ -17,21 +17,18 @@ * under the License. */ -package io.druid.guice.annotations; +package io.druid.client.cache; -import com.google.inject.BindingAnnotation; +import io.druid.java.util.common.guava.Sequence; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import java.util.function.Function; -/** - * - */ -@BindingAnnotation -@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -public @interface BackgroundCaching +public interface CachePopulator { + Sequence wrap( + Sequence sequence, + Function cacheFn, + Cache cache, + Cache.NamedKey cacheKey + ); } diff --git a/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java new file mode 100644 index 00000000000..f1bcd30d8dd --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/CachePopulatorStats.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class CachePopulatorStats +{ + private final AtomicLong okCounter = new AtomicLong(); + private final AtomicLong errorCounter = new AtomicLong(); + private final AtomicLong oversizedCounter = new AtomicLong(); + + public void incrementOk() + { + okCounter.incrementAndGet(); + } + + public void incrementError() + { + errorCounter.incrementAndGet(); + } + + public void incrementOversized() + { + oversizedCounter.incrementAndGet(); + } + + public Snapshot snapshot() + { + return new Snapshot( + okCounter.get(), + errorCounter.get(), + oversizedCounter.get() + ); + } + + public static class Snapshot + { + private final long numOk; + private final long numError; + private final long numOversized; + + Snapshot(final long numOk, final long numError, final long numOversized) + { + this.numOk = numOk; + this.numError = numError; + this.numOversized = numOversized; + } + + public long getNumOk() + { + return numOk; + } + + public long getNumError() + { + return numError; + } + + public long getNumOversized() + { + return numOversized; + } + + public Snapshot delta(Snapshot oldSnapshot) + { + if (oldSnapshot == null) { + return this; + } else { + return new Snapshot( + numOk - oldSnapshot.numOk, + numError - oldSnapshot.numError, + numOversized - oldSnapshot.numOversized + ); + } + } + } +} diff --git a/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java new file mode 100644 index 00000000000..7e7fd5e386e --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/ForegroundCachePopulator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +public class ForegroundCachePopulator implements CachePopulator +{ + private static final Logger log = new Logger(ForegroundCachePopulator.class); + + private final Object lock = new Object(); + private final ObjectMapper objectMapper; + private final CachePopulatorStats cachePopulatorStats; + private final long maxEntrySize; + + public ForegroundCachePopulator( + final ObjectMapper objectMapper, + final CachePopulatorStats cachePopulatorStats, + final long maxEntrySize + ) + { + this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); + this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); + this.maxEntrySize = maxEntrySize; + } + + @Override + public Sequence wrap( + final Sequence sequence, + final Function cacheFn, + final Cache cache, + final Cache.NamedKey cacheKey + ) + { + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + final AtomicBoolean tooBig = new AtomicBoolean(false); + final JsonGenerator jsonGenerator; + + try { + jsonGenerator = objectMapper.getFactory().createGenerator(bytes); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + return Sequences.wrap( + Sequences.map( + sequence, + input -> { + if (!tooBig.get()) { + synchronized (lock) { + try { + jsonGenerator.writeObject(cacheFn.apply(input)); + + // Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are + // typically just a few KB, and we don't want to waste cycles flushing. + if (maxEntrySize > 0 && bytes.size() > maxEntrySize) { + tooBig.set(true); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + return input; + } + ), + new SequenceWrapper() + { + @Override + public void after(final boolean isDone, final Throwable thrown) throws Exception + { + synchronized (lock) { + jsonGenerator.close(); + + if (isDone) { + // Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator. + if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) { + cachePopulatorStats.incrementOversized(); + return; + } + + try { + cache.put(cacheKey, bytes.toByteArray()); + cachePopulatorStats.incrementOk(); + } + catch (Exception e) { + log.warn(e, "Unable to write to cache"); + cachePopulatorStats.incrementError(); + } + } + } + } + } + ); + } +} diff --git a/server/src/main/java/io/druid/client/cache/HybridCache.java b/server/src/main/java/io/druid/client/cache/HybridCache.java index f732bee7c92..c25beff3da1 100644 --- a/server/src/main/java/io/druid/client/cache/HybridCache.java +++ b/server/src/main/java/io/druid/client/cache/HybridCache.java @@ -21,8 +21,8 @@ package io.druid.client.cache; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.emitter.service.ServiceEmitter; import javax.annotation.Nullable; import java.util.Collections; diff --git a/server/src/main/java/io/druid/guice/CacheModule.java b/server/src/main/java/io/druid/guice/CacheModule.java index 19d6e570c1b..488e52ca50c 100644 --- a/server/src/main/java/io/druid/guice/CacheModule.java +++ b/server/src/main/java/io/druid/guice/CacheModule.java @@ -24,6 +24,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.druid.client.cache.Cache; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.CacheProvider; import io.druid.guice.annotations.Global; @@ -48,6 +49,7 @@ public class CacheModule implements Module public void configure(Binder binder) { binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class); + binder.bind(CachePopulatorStats.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class); binder.install(new HybridCacheModule(prefix)); diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index c0c328737c1..ff9859d08ca 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -19,22 +19,26 @@ package io.druid.guice; -import com.google.common.util.concurrent.MoreExecutors; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.ProvisionException; +import io.druid.client.cache.BackgroundCachePopulator; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulator; +import io.druid.client.cache.CachePopulatorStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.collections.BlockingPool; import io.druid.collections.DefaultBlockingPool; import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.common.utils.VMUtils; -import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Merging; import io.druid.guice.annotations.Processing; +import io.druid.guice.annotations.Smile; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ExecutorServiceConfig; import io.druid.java.util.common.lifecycle.Lifecycle; @@ -64,14 +68,15 @@ public class DruidProcessingModule implements Module } @Provides - @BackgroundCaching @LazySingleton - public ExecutorService getBackgroundExecutorService( + public CachePopulator getCachePopulator( + @Smile ObjectMapper smileMapper, + CachePopulatorStats cachePopulatorStats, CacheConfig cacheConfig ) { if (cacheConfig.getNumBackgroundThreads() > 0) { - return Executors.newFixedThreadPool( + final ExecutorService exec = Executors.newFixedThreadPool( cacheConfig.getNumBackgroundThreads(), new ThreadFactoryBuilder() .setNameFormat("background-cacher-%d") @@ -79,8 +84,10 @@ public class DruidProcessingModule implements Module .setPriority(Thread.MIN_PRIORITY) .build() ); + + return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize()); } else { - return MoreExecutors.sameThreadExecutor(); + return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize()); } } diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java b/server/src/main/java/io/druid/guice/RouterProcessingModule.java index 17d132645a6..4b2bbc1710a 100644 --- a/server/src/main/java/io/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java @@ -22,16 +22,14 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; -import io.druid.client.cache.CacheConfig; import io.druid.collections.BlockingPool; import io.druid.collections.DummyBlockingPool; import io.druid.collections.DummyNonBlockingPool; import io.druid.collections.NonBlockingPool; -import io.druid.java.util.common.concurrent.Execs; -import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Merging; import io.druid.guice.annotations.Processing; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.ExecutorServiceConfig; import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidProcessingConfig; @@ -58,20 +56,6 @@ public class RouterProcessingModule implements Module MetricsModule.register(binder, ExecutorServiceMonitor.class); } - @Provides - @BackgroundCaching - @LazySingleton - public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig) - { - if (cacheConfig.getNumBackgroundThreads() > 0) { - log.error( - "numBackgroundThreads[%d] configured, that is ignored on Router", - cacheConfig.getNumBackgroundThreads() - ); - } - return Execs.dummy(); - } - @Provides @Processing @ManageLifecycle diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index c0169e74371..5b79626880d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -40,6 +40,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -166,7 +167,8 @@ public class AppenderatorImpl implements Appenderator IndexIO indexIO, IndexMerger indexMerger, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats ) { this.schema = Preconditions.checkNotNull(schema, "schema"); @@ -186,7 +188,8 @@ public class AppenderatorImpl implements Appenderator conglomerate, queryExecutorService, Preconditions.checkNotNull(cache, "cache"), - cacheConfig + cacheConfig, + cachePopulatorStats ); maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()); log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index 7651c40c54f..1e22cdd8513 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.java.util.common.StringUtils; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -52,7 +53,8 @@ public class Appenderators ServiceEmitter emitter, ExecutorService queryExecutorService, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats ) { return new AppenderatorImpl( @@ -68,7 +70,8 @@ public class Appenderators indexIO, indexMerger, cache, - cacheConfig + cacheConfig, + cachePopulatorStats ); } @@ -120,6 +123,7 @@ public class Appenderators indexIO, indexMerger, null, + null, null ); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index b6b188e801e..7d027d64aa2 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -21,11 +21,11 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.databind.ObjectMapper; -import io.druid.java.util.emitter.service.ServiceEmitter; - import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.guice.annotations.Processing; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; @@ -51,6 +51,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory private final IndexMerger indexMerger; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; public DefaultRealtimeAppenderatorFactory( @JacksonInject ServiceEmitter emitter, @@ -62,7 +63,8 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory @JacksonInject IndexIO indexIO, @JacksonInject IndexMerger indexMerger, @JacksonInject Cache cache, - @JacksonInject CacheConfig cacheConfig + @JacksonInject CacheConfig cacheConfig, + @JacksonInject CachePopulatorStats cachePopulatorStats ) { this.emitter = emitter; @@ -75,6 +77,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory this.indexMerger = indexMerger; this.cache = cache; this.cacheConfig = cacheConfig; + this.cachePopulatorStats = cachePopulatorStats; } @Override @@ -103,7 +106,8 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory emitter, queryExecutorService, cache, - cacheConfig + cacheConfig, + cachePopulatorStats ); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 6901ace8493..ef18be2dec9 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -24,15 +24,17 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.FunctionalIterable; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.BySegmentQueryRunner; import io.druid.query.CPUTimeMetricQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; @@ -76,6 +78,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final ExecutorService queryExecutorService; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; public SinkQuerySegmentWalker( String dataSource, @@ -85,7 +88,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker QueryRunnerFactoryConglomerate conglomerate, ExecutorService queryExecutorService, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); @@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService"); this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); + this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); if (!cache.isLocal()) { log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName()); @@ -235,7 +240,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker cache, toolChest, baseRunner, - MoreExecutors.sameThreadExecutor(), + // Always populate in foreground regardless of config + new ForegroundCachePopulator( + objectMapper, + cachePopulatorStats, + cacheConfig.getMaxEntrySize() + ), cacheConfig ); } else { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index aefbdf0c3a7..2bdf3a3dba0 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.StringUtils; @@ -73,6 +74,7 @@ public class FlushingPlumber extends RealtimePlumber IndexIO indexIO, Cache cache, CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper ) @@ -92,6 +94,7 @@ public class FlushingPlumber extends RealtimePlumber indexIO, cache, cacheConfig, + cachePopulatorStats, objectMapper ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index ea5e85b9514..244b2f09feb 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -24,10 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.guice.annotations.Processing; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; @@ -57,6 +58,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; private final ObjectMapper objectMapper; @JsonCreator @@ -70,6 +72,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, + @JacksonInject CachePopulatorStats cachePopulatorStats, @JacksonInject ObjectMapper objectMapper ) { @@ -85,6 +88,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool indexIO, cache, cacheConfig, + cachePopulatorStats, objectMapper ); @@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; + this.cachePopulatorStats = cachePopulatorStats; this.objectMapper = objectMapper; } @@ -122,6 +127,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool indexIO, cache, cacheConfig, + cachePopulatorStats, objectMapper ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1e7a5c8008c..bfef2c623cb 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -32,6 +32,7 @@ import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.utils.VMUtils; @@ -146,6 +147,7 @@ public class RealtimePlumber implements Plumber IndexIO indexIO, Cache cache, CacheConfig cacheConfig, + CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper ) { @@ -168,7 +170,8 @@ public class RealtimePlumber implements Plumber conglomerate, queryExecutorService, cache, - cacheConfig + cacheConfig, + cachePopulatorStats ); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index ec3ed406af7..6ad7acadb32 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.guice.annotations.Processing; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; @@ -54,6 +55,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; + private final CachePopulatorStats cachePopulatorStats; private final ObjectMapper objectMapper; @JsonCreator @@ -69,6 +71,7 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, + @JacksonInject CachePopulatorStats cachePopulatorStats, @JacksonInject ObjectMapper objectMapper ) { @@ -84,6 +87,7 @@ public class RealtimePlumberSchool implements PlumberSchool this.cache = cache; this.cacheConfig = cacheConfig; + this.cachePopulatorStats = cachePopulatorStats; this.objectMapper = objectMapper; } @@ -111,6 +115,7 @@ public class RealtimePlumberSchool implements PlumberSchool indexIO, cache, cacheConfig, + cachePopulatorStats, objectMapper ); } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index aac2e860234..4a134d7f11d 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -26,7 +26,7 @@ import com.google.inject.Inject; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.guice.annotations.BackgroundCaching; +import io.druid.client.cache.CachePopulator; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.ISE; @@ -77,7 +77,7 @@ public class ServerManager implements QuerySegmentWalker private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; private final ExecutorService exec; - private final ExecutorService cachingExec; + private final CachePopulator cachePopulator; private final Cache cache; private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; @@ -89,7 +89,7 @@ public class ServerManager implements QuerySegmentWalker QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, @Processing ExecutorService exec, - @BackgroundCaching ExecutorService cachingExec, + CachePopulator cachePopulator, @Smile ObjectMapper objectMapper, Cache cache, CacheConfig cacheConfig, @@ -101,7 +101,7 @@ public class ServerManager implements QuerySegmentWalker this.emitter = emitter; this.exec = exec; - this.cachingExec = cachingExec; + this.cachePopulator = cachePopulator; this.cache = cache; this.objectMapper = objectMapper; @@ -298,7 +298,7 @@ public class ServerManager implements QuerySegmentWalker cache, toolChest, metricsEmittingQueryRunnerInner, - cachingExec, + cachePopulator, cacheConfig ); diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index f50eea7e97d..67ac01c0130 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -21,10 +21,11 @@ package io.druid.client; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.Cache; +import io.druid.client.cache.CachePopulator; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.client.cache.MapCache; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; @@ -74,7 +75,9 @@ public class CachingClusteredClientFunctionalityTest timeline = new VersionedIntervalTimeline<>(Ordering.natural()); serverView = EasyMock.createNiceMock(TimelineServerView.class); cache = MapCache.create(100000); - client = makeClient(MoreExecutors.sameThreadExecutor()); + client = makeClient( + new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper, new CachePopulatorStats(), -1) + ); } @Test @@ -199,13 +202,13 @@ public class CachingClusteredClientFunctionalityTest )); } - protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) + protected CachingClusteredClient makeClient(final CachePopulator cachePopulator) { - return makeClient(backgroundExecutorService, cache, 10); + return makeClient(cachePopulator, cache, 10); } protected CachingClusteredClient makeClient( - final ListeningExecutorService backgroundExecutorService, + final CachePopulator cachePopulator, final Cache cache, final int mergeLimit ) @@ -245,7 +248,7 @@ public class CachingClusteredClientFunctionalityTest }, cache, CachingClusteredClientTest.jsonMapper, - backgroundExecutorService, + cachePopulator, new CacheConfig() { @Override diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 31f68ffbb64..6143dad7902 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -43,8 +43,12 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import io.druid.client.cache.BackgroundCachePopulator; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulator; +import io.druid.client.cache.CachePopulatorStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.client.cache.MapCache; import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.QueryableDruidServer; @@ -330,7 +334,7 @@ public class CachingClusteredClientTest timeline = new VersionedIntervalTimeline<>(Ordering.natural()); serverView = EasyMock.createNiceMock(TimelineServerView.class); cache = MapCache.create(100000); - client = makeClient(MoreExecutors.sameThreadExecutor()); + client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1)); servers = new DruidServer[]{ new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, "bye", 0), @@ -422,7 +426,14 @@ public class CachingClusteredClientTest } }; - client = makeClient(randomizingExecutorService); + client = makeClient( + new BackgroundCachePopulator( + randomizingExecutorService, + jsonMapper, + new CachePopulatorStats(), + -1 + ) + ); // callback to be run every time a query run is complete, to ensure all background // caching tasks are executed, and cache is populated before we move onto the next query @@ -579,7 +590,7 @@ public class CachingClusteredClientTest .andReturn(ImmutableMap.of()) .once(); EasyMock.replay(cache); - client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit); + client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, limit); final DruidServer lastServer = servers[random.nextInt(servers.length)]; final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class); EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes(); @@ -604,7 +615,7 @@ public class CachingClusteredClientTest .andReturn(ImmutableMap.of()) .once(); EasyMock.replay(cache); - client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0); + client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, 0); getDefaultQueryRunner().run(QueryPlus.wrap(query), context); EasyMock.verify(cache); EasyMock.verify(dataSegment); @@ -2630,13 +2641,13 @@ public class CachingClusteredClientTest EasyMock.reset(mocks); } - protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) + protected CachingClusteredClient makeClient(final CachePopulator cachePopulator) { - return makeClient(backgroundExecutorService, cache, 10); + return makeClient(cachePopulator, cache, 10); } protected CachingClusteredClient makeClient( - final ListeningExecutorService backgroundExecutorService, + final CachePopulator cachePopulator, final Cache cache, final int mergeLimit ) @@ -2676,7 +2687,7 @@ public class CachingClusteredClientTest }, cache, jsonMapper, - backgroundExecutorService, + cachePopulator, new CacheConfig() { @Override diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index ab2bf224836..012fe2b9be2 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -19,20 +19,26 @@ package io.druid.client; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.BackgroundCachePopulator; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulator; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.CacheStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.client.cache.MapCache; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.SequenceWrapper; @@ -63,15 +69,15 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.ByteArrayOutputStream; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -98,14 +104,22 @@ public class CachingQueryRunnerTest DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 }; - private ExecutorService backgroundExecutorService; + private ObjectMapper objectMapper; + private CachePopulator cachePopulator; public CachingQueryRunnerTest(int numBackgroundThreads) { + objectMapper = new DefaultObjectMapper(); + if (numBackgroundThreads > 0) { - backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads); + cachePopulator = new BackgroundCachePopulator( + Execs.multiThreaded(numBackgroundThreads, "CachingQueryRunnerTest-%d"), + objectMapper, + new CachePopulatorStats(), + -1 + ); } else { - backgroundExecutorService = MoreExecutors.sameThreadExecutor(); + cachePopulator = new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), -1); } } @@ -274,7 +288,7 @@ public class CachingQueryRunnerTest return resultSeq; } }, - backgroundExecutorService, + cachePopulator, new CacheConfig() { @Override @@ -331,7 +345,7 @@ public class CachingQueryRunnerTest List expectedResults, Query query, QueryToolChest toolchest - ) + ) throws IOException { DefaultObjectMapper objectMapper = new DefaultObjectMapper(); String segmentIdentifier = "segment"; @@ -345,12 +359,7 @@ public class CachingQueryRunnerTest ); Cache cache = MapCache.create(1024 * 1024); - CacheUtil.populate( - cache, - objectMapper, - cacheKey, - Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache()) - ); + cache.put(cacheKey, toByteArray(Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache()))); CachingQueryRunner runner = new CachingQueryRunner( segmentIdentifier, @@ -367,7 +376,7 @@ public class CachingQueryRunnerTest return Sequences.empty(); } }, - backgroundExecutorService, + cachePopulator, new CacheConfig() { @Override @@ -434,6 +443,19 @@ public class CachingQueryRunnerTest return retVal; } + private byte[] toByteArray(final Iterable results) throws IOException + { + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) { + for (T result : results) { + gen.writeObject(result); + } + } + + return bytes.toByteArray(); + } + private static class AssertingClosable implements Closeable { diff --git a/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java new file mode 100644 index 00000000000..b855e20288a --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/CachePopulatorTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.jackson.JacksonUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +public class CachePopulatorTest +{ + private final ExecutorService exec = Execs.multiThreaded(2, "cache-populator-test-%d"); + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Cache cache = new MapCache(new ByteCountingLRUMap(Long.MAX_VALUE)); + private final CachePopulatorStats stats = new CachePopulatorStats(); + + @After + public void tearDown() + { + exec.shutdownNow(); + } + + @Test + public void testForegroundPopulator() + { + final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, -1); + final List strings = ImmutableList.of("foo", "bar"); + + Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings)); + Assert.assertEquals(strings, readFromCache(makeKey(1))); + Assert.assertEquals(1, stats.snapshot().getNumOk()); + Assert.assertEquals(0, stats.snapshot().getNumError()); + Assert.assertEquals(0, stats.snapshot().getNumOversized()); + } + + @Test + public void testForegroundPopulatorMaxEntrySize() + { + final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, 30); + final List strings = ImmutableList.of("foo", "bar"); + final List strings2 = ImmutableList.of("foo", "baralararararararaarararararaa"); + + Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings)); + Assert.assertEquals(strings, readFromCache(makeKey(1))); + Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2)); + Assert.assertNull(readFromCache(makeKey(2))); + + Assert.assertEquals(1, stats.snapshot().getNumOk()); + Assert.assertEquals(0, stats.snapshot().getNumError()); + Assert.assertEquals(1, stats.snapshot().getNumOversized()); + } + + @Test(timeout = 60000L) + public void testBackgroundPopulator() throws InterruptedException + { + final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, -1); + final List strings = ImmutableList.of("foo", "bar"); + + Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings)); + + // Wait for background updates to happen. + while (cache.getStats().getNumEntries() < 1) { + Thread.sleep(100); + } + + Assert.assertEquals(strings, readFromCache(makeKey(1))); + Assert.assertEquals(1, stats.snapshot().getNumOk()); + Assert.assertEquals(0, stats.snapshot().getNumError()); + Assert.assertEquals(0, stats.snapshot().getNumOversized()); + } + + @Test(timeout = 60000L) + public void testBackgroundPopulatorMaxEntrySize() throws InterruptedException + { + final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, 30); + final List strings = ImmutableList.of("foo", "bar"); + final List strings2 = ImmutableList.of("foo", "baralararararararaarararararaa"); + + Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings)); + Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2)); + + // Wait for background updates to happen. + while (cache.getStats().getNumEntries() < 1 || stats.snapshot().getNumOversized() < 1) { + Thread.sleep(100); + } + + Assert.assertEquals(strings, readFromCache(makeKey(1))); + Assert.assertNull(readFromCache(makeKey(2))); + Assert.assertEquals(1, stats.snapshot().getNumOk()); + Assert.assertEquals(0, stats.snapshot().getNumError()); + Assert.assertEquals(1, stats.snapshot().getNumOversized()); + } + + private static Cache.NamedKey makeKey(final int n) + { + return new Cache.NamedKey("test", Ints.toByteArray(n)); + } + + private List wrapAndReturn( + final CachePopulator populator, + final Cache.NamedKey key, + final List strings + ) + { + return populator.wrap(Sequences.simple(strings), s -> ImmutableMap.of("s", s), cache, key).toList(); + } + + private List readFromCache(final Cache.NamedKey key) + { + final byte[] bytes = cache.get(key); + if (bytes == null) { + return null; + } + + try ( + final MappingIterator> iterator = objectMapper.readValues( + objectMapper.getFactory().createParser(bytes), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING + ) + ) { + final List> retVal = new ArrayList<>(); + Iterators.addAll(retVal, iterator); + + // Undo map-wrapping that was done in wrapAndReturn. + return retVal.stream().map(m -> m.get("s")).collect(Collectors.toList()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 7b503aa2348..f88f5bffc6c 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; @@ -115,6 +116,7 @@ public class FireDepartmentTest TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()), MapCache.create(0), NO_CACHE_CONFIG, + new CachePopulatorStats(), TestHelper.makeJsonMapper() ), diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 3c675c69c17..c5522d9fbfe 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; @@ -273,7 +274,8 @@ public class AppenderatorTester implements AutoCloseable emitter, queryExecutor, MapCache.create(2048), - new CacheConfig() + new CacheConfig(), + new CachePopulatorStats() ); } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index a5ad8a4633e..ef964178fe8 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.cache.CachePopulatorStats; import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -223,6 +224,7 @@ public class RealtimePlumberSchoolTest TestHelper.getTestIndexIO(segmentWriteOutMediumFactory), MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG, + new CachePopulatorStats(), TestHelper.makeJsonMapper() ); diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 1287748e596..df4aebf0a6b 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -26,8 +26,9 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CachePopulatorStats; +import io.druid.client.cache.ForegroundCachePopulator; import io.druid.client.cache.LocalCacheProvider; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.IAE; @@ -153,7 +154,7 @@ public class ServerManagerTest }, new NoopServiceEmitter(), serverManagerExec, - MoreExecutors.sameThreadExecutor(), + new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1), new DefaultObjectMapper(), new LocalCacheProvider().get(), new CacheConfig(),