diff --git a/docs/content/configuration/caching.md b/docs/content/configuration/caching.md index ddee7c49736..babf8e42390 100644 --- a/docs/content/configuration/caching.md +++ b/docs/content/configuration/caching.md @@ -4,8 +4,9 @@ layout: doc_page # Caching -Caching can optionally be enabled on the broker and / or historical nodes. -See the [broker](broker.html#caching) and [historical](historical.html#caching) +Caching can optionally be enabled on the broker, historical, and realtime +nodes, as well as realtime index tasks. See [broker](broker.html#caching), +[historical](historical.html#caching), and [realtime](realtime.html#caching) configuration options for how to enable it for individual node types. Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified. diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 253e785f629..0ea9c8b80fd 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -222,14 +222,14 @@ This deep storage is used to interface with Cassandra. ### Caching -You can enable caching of results at the broker/historical using following configurations. +You can enable caching of results at the broker, historical, or realtime level using following configurations. |Property|Description|Default| |--------|-----------|-------| |`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| -|`druid.(broker/historical).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| -|`druid.(broker/historical).cache.useCache`|Whether to use cache for getting query results.|false| -|`druid.(broker/historical).cache.populateCache`|Whether to populate cache.|false| +|`druid.(broker|historical|realtime).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| +|`druid.(broker|historical|realtime).cache.useCache`|Whether to use cache for getting query results.|false| +|`druid.(broker|historical|realtime).cache.populateCache`|Whether to populate cache.|false| #### Local Cache diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md index ccdce606995..48b8623e3e4 100644 --- a/docs/content/configuration/realtime.md +++ b/docs/content/configuration/realtime.md @@ -59,5 +59,14 @@ The realtime node uses several of the global configs in [Configuration](../confi |--------|-----------|-------| |`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000| +### Caching +You can optionally configure caching to be enabled on the realtime node by setting caching configs here. +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false| +|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false| +|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| + +See [cache configuration](caching.html) for how to configure cache settings. diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index fb5b7515a7e..0ef8510fc1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -27,6 +27,8 @@ import com.google.common.collect.Multimaps; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -75,6 +77,9 @@ public class TaskToolbox private final File taskWorkDir; private final IndexMerger indexMerger; private final IndexIO indexIO; + private final Cache cache; + private final CacheConfig cacheConfig; + public TaskToolbox( TaskConfig config, @@ -94,7 +99,9 @@ public class TaskToolbox ObjectMapper objectMapper, File taskWorkDir, IndexMerger indexMerger, - IndexIO indexIO + IndexIO indexIO, + Cache cache, + CacheConfig cacheConfig ) { this.config = config; @@ -115,6 +122,8 @@ public class TaskToolbox this.taskWorkDir = taskWorkDir; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); + this.cache = cache; + this.cacheConfig = cacheConfig; } public TaskConfig getConfig() @@ -227,4 +236,14 @@ public class TaskToolbox { return indexMerger; } + + public Cache getCache() + { + return cache; + } + + public CacheConfig getCacheConfig() + { + return cacheConfig; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 9ddb303cde9..ef8ef3e2b79 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -23,6 +23,8 @@ import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; @@ -60,6 +62,8 @@ public class TaskToolboxFactory private final ObjectMapper objectMapper; private final IndexMerger indexMerger; private final IndexIO indexIO; + private final Cache cache; + private final CacheConfig cacheConfig; @Inject public TaskToolboxFactory( @@ -78,7 +82,9 @@ public class TaskToolboxFactory SegmentLoaderFactory segmentLoaderFactory, ObjectMapper objectMapper, IndexMerger indexMerger, - IndexIO indexIO + IndexIO indexIO, + Cache cache, + CacheConfig cacheConfig ) { this.config = config; @@ -97,6 +103,8 @@ public class TaskToolboxFactory this.objectMapper = objectMapper; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); + this.cache = cache; + this.cacheConfig = cacheConfig; } public TaskToolbox build(Task task) @@ -121,7 +129,9 @@ public class TaskToolboxFactory objectMapper, taskWorkDir, indexMerger, - indexIO + indexIO, + cache, + cacheConfig ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 22000c4a540..b4f75004031 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -277,7 +277,10 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getNewSegmentServerView(), toolbox.getQueryExecutorService(), toolbox.getIndexMerger(), - toolbox.getIndexIO() + toolbox.getIndexIO(), + toolbox.getCache(), + toolbox.getCacheConfig(), + toolbox.getObjectMapper() ); this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 0197c5c346e..ac8475a993b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -74,6 +76,8 @@ public class TaskToolboxTest private Task task = EasyMock.createMock(Task.class); private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class); private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); + private Cache mockCache = EasyMock.createMock(Cache.class); + private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -100,7 +104,9 @@ public class TaskToolboxTest new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), ObjectMapper, mockIndexMerger, - mockIndexIO + mockIndexIO, + mockCache, + mockCacheConfig ); } @@ -180,4 +186,16 @@ public class TaskToolboxTest { Assert.assertEquals(mockDataSegmentMover, taskToolbox.build(task).getDataSegmentMover()); } + + @Test + public void testGetCache() throws Exception + { + Assert.assertEquals(mockCache, taskToolbox.build(task).getCache()); + } + + @Test + public void testGetCacheConfig() throws Exception + { + Assert.assertEquals(mockCacheConfig, taskToolbox.build(task).getCacheConfig()); + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index a01ffb62c48..afa239fed7b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -257,7 +257,7 @@ public class IndexTaskTest return segment; } }, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(), - indexMerger, indexIO + indexMerger, indexIO, null, null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 5ece6cd52a8..c53805e53f2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -260,7 +260,9 @@ public class IngestSegmentFirehoseFactoryTest ), MAPPER, INDEX_MERGER, - INDEX_IO + INDEX_IO, + null, + null ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 3bd5de2c6ae..2f9e9dbf690 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -328,7 +328,9 @@ public class IngestSegmentFirehoseFactoryTimelineTest ), MAPPER, INDEX_MERGER, - INDEX_IO + INDEX_IO, + null, + null ); final Injector injector = Guice.createInjector( new Module() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 7d326d76567..7071935fb6f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -44,6 +44,7 @@ import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -67,6 +68,7 @@ import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; +import io.druid.indexing.common.task.RealtimeIndexTaskTest; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; @@ -94,6 +96,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentTest; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; @@ -484,7 +487,9 @@ public class TaskLifecycleTest ), MAPPER, INDEX_MERGER, - INDEX_IO + INDEX_IO, + MapCache.create(0), + FireDepartmentTest.NO_CACHE_CONFIG ); tr = new ThreadPoolTaskRunner(tb, null); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 9b36224fdaa..aa691c757fc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -156,7 +156,9 @@ public class WorkerTaskMonitorTest ), jsonMapper, indexMerger, - indexIO + indexIO, + null, + null ), null ), diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index 19fed2684d9..a255f8d4fc2 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -36,11 +36,11 @@ public class TestHelper private static final IndexMerger INDEX_MERGER; private static final IndexMaker INDEX_MAKER; private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); static { - ObjectMapper jsonMapper = new DefaultObjectMapper(); INDEX_IO = new IndexIO( - jsonMapper, + JSON_MAPPER, new ColumnConfig() { @Override @@ -50,10 +50,11 @@ public class TestHelper } } ); - INDEX_MERGER = new IndexMerger(jsonMapper, INDEX_IO); - INDEX_MAKER = new IndexMaker(jsonMapper, INDEX_IO); + INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO); + INDEX_MAKER = new IndexMaker(JSON_MAPPER, INDEX_IO); } + public static IndexMerger getTestIndexMerger() { return INDEX_MERGER; @@ -69,6 +70,10 @@ public class TestHelper return INDEX_IO; } + public static ObjectMapper getObjectMapper() { + return JSON_MAPPER; + } + public static void assertExpectedResults(Iterable> expectedResults, Sequence> results) { assertResults(expectedResults, Sequences.toList(results, Lists.>newArrayList()), ""); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index cd260d0a2f5..97b87c0e1a3 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -17,11 +17,14 @@ package io.druid.segment.realtime.plumber; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.concurrent.Execs; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -63,7 +66,11 @@ public class FlushingPlumber extends RealtimePlumber DataSegmentAnnouncer segmentAnnouncer, ExecutorService queryExecutorService, IndexMerger indexMerger, - IndexIO indexIO + IndexIO indexIO, + Cache cache, + CacheConfig cacheConfig, + ObjectMapper objectMapper + ) { super( @@ -78,7 +85,10 @@ public class FlushingPlumber extends RealtimePlumber null, null, indexMerger, - indexIO + indexIO, + cache, + cacheConfig, + objectMapper ); this.flushDuration = flushDuration; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index d54ff38abc3..cffb33f922b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -20,8 +20,11 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; @@ -50,6 +53,9 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; private final IndexIO indexIO; + private final Cache cache; + private final CacheConfig cacheConfig; + private final ObjectMapper objectMapper; @JsonCreator public FlushingPlumberSchool( @@ -59,7 +65,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject IndexMerger indexMerger, - @JacksonInject IndexIO indexIO + @JacksonInject IndexIO indexIO, + @JacksonInject Cache cache, + @JacksonInject CacheConfig cacheConfig, + @JacksonInject ObjectMapper objectMapper ) { super( @@ -71,7 +80,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, queryExecutorService, indexMerger, - indexIO + indexIO, + cache, + cacheConfig, + objectMapper ); this.flushDuration = flushDuration == null ? defaultFlushDuration : flushDuration; @@ -81,6 +93,9 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.queryExecutorService = queryExecutorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); + this.cache = cache; + this.cacheConfig = cacheConfig; + this.objectMapper = objectMapper; } @Override @@ -102,7 +117,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool segmentAnnouncer, queryExecutorService, indexMerger, - indexIO + indexIO, + cache, + cacheConfig, + objectMapper ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index cd9261057f8..40b1a47a7a3 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.plumber; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -40,8 +41,11 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.CachingQueryRunner; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.utils.VMUtils; @@ -63,6 +67,7 @@ import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -120,6 +125,10 @@ public class RealtimePlumber implements Plumber String.CASE_INSENSITIVE_ORDER ); + private final Cache cache; + private final CacheConfig cacheConfig; + private final ObjectMapper objectMapper; + private volatile long nextFlush = 0; private volatile boolean shuttingDown = false; private volatile boolean stopped = false; @@ -145,7 +154,10 @@ public class RealtimePlumber implements Plumber SegmentPublisher segmentPublisher, FilteredServerView serverView, IndexMerger indexMerger, - IndexIO indexIO + IndexIO indexIO, + Cache cache, + CacheConfig cacheConfig, + ObjectMapper objectMapper ) { this.schema = schema; @@ -161,6 +173,13 @@ public class RealtimePlumber implements Plumber this.serverView = serverView; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); + this.cache = cache; + this.cacheConfig = cacheConfig; + this.objectMapper = objectMapper; + + if(!cache.isLocal()) { + log.error("Configured cache is not local, caching will not be enabled"); + } log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -323,9 +342,25 @@ public class RealtimePlumber implements Plumber } // Prevent the underlying segment from closing when its being iterated - final Closeable closeable = input.getSegment().increment(); + final ReferenceCountingSegment segment = input.getSegment(); + final Closeable closeable = segment.increment(); try { - return factory.createRunner(input.getSegment()); + if (input.hasSwapped() // only use caching if data is immutable + && cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local + ) { + return new CachingQueryRunner<>( + makeHydrantIdentifier(input, segment), + descriptor, + objectMapper, + cache, + toolchest, + factory.createRunner(segment), + MoreExecutors.sameThreadExecutor(), + cacheConfig + ); + } else { + return factory.createRunner(input.getSegment()); + } } finally { try { @@ -353,6 +388,11 @@ public class RealtimePlumber implements Plumber ); } + protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment) + { + return segment.getIdentifier() + "_" + input.getCount(); + } + @Override public void persist(final Committer committer) { @@ -901,6 +941,9 @@ public class RealtimePlumber implements Plumber sink.getVersion(), new SingleElementPartitionChunk<>(sink) ); + for (FireHydrant hydrant : sink) { + cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment())); + } synchronized (handoffCondition) { handoffCondition.notifyAll(); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index ee40c2058d4..07eea0dbe1c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -19,9 +19,12 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; @@ -48,6 +51,9 @@ public class RealtimePlumberSchool implements PlumberSchool private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; private final IndexIO indexIO; + private final Cache cache; + private final CacheConfig cacheConfig; + private final ObjectMapper objectMapper; @JsonCreator public RealtimePlumberSchool( @@ -59,8 +65,11 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject FilteredServerView serverView, @JacksonInject @Processing ExecutorService executorService, @JacksonInject IndexMerger indexMerger, - @JacksonInject IndexIO indexIO - ) + @JacksonInject IndexIO indexIO, + @JacksonInject Cache cache, + @JacksonInject CacheConfig cacheConfig, + @JacksonInject ObjectMapper objectMapper + ) { this.emitter = emitter; this.conglomerate = conglomerate; @@ -71,6 +80,10 @@ public class RealtimePlumberSchool implements PlumberSchool this.queryExecutorService = executorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); + + this.cache = cache; + this.cacheConfig = cacheConfig; + this.objectMapper = objectMapper; } @Override @@ -94,7 +107,10 @@ public class RealtimePlumberSchool implements PlumberSchool segmentPublisher, serverView, indexMerger, - indexIO + indexIO, + cache, + cacheConfig, + objectMapper ); } diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 4a17b89d873..6ead8cfe025 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -20,6 +20,8 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.Granularity; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.StringInputRowParser; @@ -44,6 +46,22 @@ import java.util.Map; */ public class FireDepartmentTest { + + public static final CacheConfig NO_CACHE_CONFIG = new CacheConfig() + { + @Override + public boolean isPopulateCache() + { + return false; + } + + @Override + public boolean isUseCache() + { + return false; + } + }; + @Test public void testSerde() throws Exception { @@ -87,7 +105,11 @@ public class FireDepartmentTest null, null, TestHelper.getTestIndexMerger(), - TestHelper.getTestIndexIO() + TestHelper.getTestIndexIO(), + MapCache.create(0), + NO_CACHE_CONFIG, + TestHelper.getObjectMapper() + ), null ), diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index cab23a1c84f..47bf49c10db 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -30,6 +30,7 @@ import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.client.cache.MapCache; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.Row; @@ -50,6 +51,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.FireDepartmentTest; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; @@ -193,7 +195,10 @@ public class RealtimePlumberSchoolTest serverView, MoreExecutors.sameThreadExecutor(), TestHelper.getTestIndexMerger(), - TestHelper.getTestIndexIO() + TestHelper.getTestIndexIO(), + MapCache.create(0), + FireDepartmentTest.NO_CACHE_CONFIG, + TestHelper.getObjectMapper() ); metrics = new FireDepartmentMetrics(); diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 7c405390736..0a2af6948dd 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -30,7 +30,9 @@ import com.metamx.common.logger.Logger; import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; +import io.druid.client.cache.CacheConfig; import io.druid.guice.Binders; +import io.druid.guice.CacheModule; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -156,6 +158,9 @@ public class CliPeon extends GuiceRunnable binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); + JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); + binder.install(new CacheModule()); + // Override the default SegmentLoaderConfig because we don't actually care about the // configuration based locations. This will override them anyway. This is also stopping // configuration of other parameters, but I don't think that's actually a problem. diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index fcc12f5d5bb..f7511ef0348 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -23,6 +23,7 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import io.druid.cli.QueryJettyServerInitializer; +import io.druid.client.cache.CacheConfig; import io.druid.metadata.MetadataSegmentPublisher; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.FireDepartment; @@ -79,6 +80,9 @@ public class RealtimeModule implements Module .toProvider(FireDepartmentsProvider.class) .in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class); + binder.install(new CacheModule()); + binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);