enable query caching on intermediate realtime persists

This commit is contained in:
Xavier Léauté 2015-11-10 10:23:56 -08:00
parent 0fb7e4e040
commit d7eb2f717e
21 changed files with 232 additions and 33 deletions

View File

@ -4,8 +4,9 @@ layout: doc_page
# Caching # Caching
Caching can optionally be enabled on the broker and / or historical nodes. Caching can optionally be enabled on the broker, historical, and realtime
See the [broker](broker.html#caching) and [historical](historical.html#caching) nodes, as well as realtime index tasks. See [broker](broker.html#caching),
[historical](historical.html#caching), and [realtime](realtime.html#caching)
configuration options for how to enable it for individual node types. configuration options for how to enable it for individual node types.
Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified. Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified.

View File

@ -222,14 +222,14 @@ This deep storage is used to interface with Cassandra.
### Caching ### Caching
You can enable caching of results at the broker/historical using following configurations. You can enable caching of results at the broker, historical, or realtime level using following configurations.
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`| |`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.(broker/historical).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| |`druid.(broker|historical|realtime).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.(broker/historical).cache.useCache`|Whether to use cache for getting query results.|false| |`druid.(broker|historical|realtime).cache.useCache`|Whether to use cache for getting query results.|false|
|`druid.(broker/historical).cache.populateCache`|Whether to populate cache.|false| |`druid.(broker|historical|realtime).cache.populateCache`|Whether to populate cache.|false|
#### Local Cache #### Local Cache

View File

@ -59,5 +59,14 @@ The realtime node uses several of the global configs in [Configuration](../confi
|--------|-----------|-------| |--------|-----------|-------|
|`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000| |`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000|
### Caching
You can optionally configure caching to be enabled on the realtime node by setting caching configs here.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
See [cache configuration](caching.html) for how to configure cache settings.

View File

@ -27,6 +27,8 @@ import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
@ -75,6 +77,9 @@ public class TaskToolbox
private final File taskWorkDir; private final File taskWorkDir;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexIO indexIO; private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
public TaskToolbox( public TaskToolbox(
TaskConfig config, TaskConfig config,
@ -94,7 +99,9 @@ public class TaskToolbox
ObjectMapper objectMapper, ObjectMapper objectMapper,
File taskWorkDir, File taskWorkDir,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexIO indexIO IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
) )
{ {
this.config = config; this.config = config;
@ -115,6 +122,8 @@ public class TaskToolbox
this.taskWorkDir = taskWorkDir; this.taskWorkDir = taskWorkDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
} }
public TaskConfig getConfig() public TaskConfig getConfig()
@ -227,4 +236,14 @@ public class TaskToolbox
{ {
return indexMerger; return indexMerger;
} }
public Cache getCache()
{
return cache;
}
public CacheConfig getCacheConfig()
{
return cacheConfig;
}
} }

View File

@ -23,6 +23,8 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
@ -60,6 +62,8 @@ public class TaskToolboxFactory
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexIO indexIO; private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
@Inject @Inject
public TaskToolboxFactory( public TaskToolboxFactory(
@ -78,7 +82,9 @@ public class TaskToolboxFactory
SegmentLoaderFactory segmentLoaderFactory, SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper, ObjectMapper objectMapper,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexIO indexIO IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
) )
{ {
this.config = config; this.config = config;
@ -97,6 +103,8 @@ public class TaskToolboxFactory
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
} }
public TaskToolbox build(Task task) public TaskToolbox build(Task task)
@ -121,7 +129,9 @@ public class TaskToolboxFactory
objectMapper, objectMapper,
taskWorkDir, taskWorkDir,
indexMerger, indexMerger,
indexIO indexIO,
cache,
cacheConfig
); );
} }
} }

View File

@ -277,7 +277,10 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getNewSegmentServerView(), toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(), toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(), toolbox.getIndexMerger(),
toolbox.getIndexIO() toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getObjectMapper()
); );
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());

View File

@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -74,6 +76,8 @@ public class TaskToolboxTest
private Task task = EasyMock.createMock(Task.class); private Task task = EasyMock.createMock(Task.class);
private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class); private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class);
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -100,7 +104,9 @@ public class TaskToolboxTest
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
ObjectMapper, ObjectMapper,
mockIndexMerger, mockIndexMerger,
mockIndexIO mockIndexIO,
mockCache,
mockCacheConfig
); );
} }
@ -180,4 +186,16 @@ public class TaskToolboxTest
{ {
Assert.assertEquals(mockDataSegmentMover, taskToolbox.build(task).getDataSegmentMover()); Assert.assertEquals(mockDataSegmentMover, taskToolbox.build(task).getDataSegmentMover());
} }
@Test
public void testGetCache() throws Exception
{
Assert.assertEquals(mockCache, taskToolbox.build(task).getCache());
}
@Test
public void testGetCacheConfig() throws Exception
{
Assert.assertEquals(mockCacheConfig, taskToolbox.build(task).getCacheConfig());
}
} }

View File

@ -257,7 +257,7 @@ public class IndexTaskTest
return segment; return segment;
} }
}, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(), }, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(),
indexMerger, indexIO indexMerger, indexIO, null, null
) )
); );

View File

@ -260,7 +260,9 @@ public class IngestSegmentFirehoseFactoryTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_IO INDEX_IO,
null,
null
); );
Collection<Object[]> values = new LinkedList<>(); Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList( for (InputRowParser parser : Arrays.<InputRowParser>asList(

View File

@ -328,7 +328,9 @@ public class IngestSegmentFirehoseFactoryTimelineTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_IO INDEX_IO,
null,
null
); );
final Injector injector = Guice.createInjector( final Injector injector = Guice.createInjector(
new Module() new Module()

View File

@ -44,6 +44,7 @@ import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -67,6 +68,7 @@ import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.RealtimeIndexTaskTest;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
@ -94,6 +96,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentTest;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -484,7 +487,9 @@ public class TaskLifecycleTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_IO INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG
); );
tr = new ThreadPoolTaskRunner(tb, null); tr = new ThreadPoolTaskRunner(tb, null);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);

View File

@ -156,7 +156,9 @@ public class WorkerTaskMonitorTest
), ),
jsonMapper, jsonMapper,
indexMerger, indexMerger,
indexIO indexIO,
null,
null
), ),
null null
), ),

View File

@ -36,11 +36,11 @@ public class TestHelper
private static final IndexMerger INDEX_MERGER; private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER; private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO; private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
static { static {
ObjectMapper jsonMapper = new DefaultObjectMapper();
INDEX_IO = new IndexIO( INDEX_IO = new IndexIO(
jsonMapper, JSON_MAPPER,
new ColumnConfig() new ColumnConfig()
{ {
@Override @Override
@ -50,10 +50,11 @@ public class TestHelper
} }
} }
); );
INDEX_MERGER = new IndexMerger(jsonMapper, INDEX_IO); INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MAKER = new IndexMaker(jsonMapper, INDEX_IO); INDEX_MAKER = new IndexMaker(JSON_MAPPER, INDEX_IO);
} }
public static IndexMerger getTestIndexMerger() public static IndexMerger getTestIndexMerger()
{ {
return INDEX_MERGER; return INDEX_MERGER;
@ -69,6 +70,10 @@ public class TestHelper
return INDEX_IO; return INDEX_IO;
} }
public static ObjectMapper getObjectMapper() {
return JSON_MAPPER;
}
public static <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Sequence<Result<T>> results) public static <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Sequence<Result<T>> results)
{ {
assertResults(expectedResults, Sequences.toList(results, Lists.<Result<T>>newArrayList()), ""); assertResults(expectedResults, Sequences.toList(results, Lists.<Result<T>>newArrayList()), "");

View File

@ -17,11 +17,14 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
@ -63,7 +66,11 @@ public class FlushingPlumber extends RealtimePlumber
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService, ExecutorService queryExecutorService,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexIO indexIO IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
ObjectMapper objectMapper
) )
{ {
super( super(
@ -78,7 +85,10 @@ public class FlushingPlumber extends RealtimePlumber
null, null,
null, null,
indexMerger, indexMerger,
indexIO indexIO,
cache,
cacheConfig,
objectMapper
); );
this.flushDuration = flushDuration; this.flushDuration = flushDuration;

View File

@ -20,8 +20,11 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
@ -50,6 +53,9 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexIO indexIO; private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final ObjectMapper objectMapper;
@JsonCreator @JsonCreator
public FlushingPlumberSchool( public FlushingPlumberSchool(
@ -59,7 +65,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject IndexMerger indexMerger, @JacksonInject IndexMerger indexMerger,
@JacksonInject IndexIO indexIO @JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject ObjectMapper objectMapper
) )
{ {
super( super(
@ -71,7 +80,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null, null,
queryExecutorService, queryExecutorService,
indexMerger, indexMerger,
indexIO indexIO,
cache,
cacheConfig,
objectMapper
); );
this.flushDuration = flushDuration == null ? defaultFlushDuration : flushDuration; this.flushDuration = flushDuration == null ? defaultFlushDuration : flushDuration;
@ -81,6 +93,9 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.queryExecutorService = queryExecutorService; this.queryExecutorService = queryExecutorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.objectMapper = objectMapper;
} }
@Override @Override
@ -102,7 +117,10 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
segmentAnnouncer, segmentAnnouncer,
queryExecutorService, queryExecutorService,
indexMerger, indexMerger,
indexIO indexIO,
cache,
cacheConfig,
objectMapper
); );
} }

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -40,8 +41,11 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils; import io.druid.common.utils.VMUtils;
@ -63,6 +67,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment; import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -120,6 +125,10 @@ public class RealtimePlumber implements Plumber
String.CASE_INSENSITIVE_ORDER String.CASE_INSENSITIVE_ORDER
); );
private final Cache cache;
private final CacheConfig cacheConfig;
private final ObjectMapper objectMapper;
private volatile long nextFlush = 0; private volatile long nextFlush = 0;
private volatile boolean shuttingDown = false; private volatile boolean shuttingDown = false;
private volatile boolean stopped = false; private volatile boolean stopped = false;
@ -145,7 +154,10 @@ public class RealtimePlumber implements Plumber
SegmentPublisher segmentPublisher, SegmentPublisher segmentPublisher,
FilteredServerView serverView, FilteredServerView serverView,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexIO indexIO IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
ObjectMapper objectMapper
) )
{ {
this.schema = schema; this.schema = schema;
@ -161,6 +173,13 @@ public class RealtimePlumber implements Plumber
this.serverView = serverView; this.serverView = serverView;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.objectMapper = objectMapper;
if(!cache.isLocal()) {
log.error("Configured cache is not local, caching will not be enabled");
}
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
} }
@ -323,9 +342,25 @@ public class RealtimePlumber implements Plumber
} }
// Prevent the underlying segment from closing when its being iterated // Prevent the underlying segment from closing when its being iterated
final Closeable closeable = input.getSegment().increment(); final ReferenceCountingSegment segment = input.getSegment();
final Closeable closeable = segment.increment();
try { try {
return factory.createRunner(input.getSegment()); if (input.hasSwapped() // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantIdentifier(input, segment),
descriptor,
objectMapper,
cache,
toolchest,
factory.createRunner(segment),
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return factory.createRunner(input.getSegment());
}
} }
finally { finally {
try { try {
@ -353,6 +388,11 @@ public class RealtimePlumber implements Plumber
); );
} }
protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
}
@Override @Override
public void persist(final Committer committer) public void persist(final Committer committer)
{ {
@ -901,6 +941,9 @@ public class RealtimePlumber implements Plumber
sink.getVersion(), sink.getVersion(),
new SingleElementPartitionChunk<>(sink) new SingleElementPartitionChunk<>(sink)
); );
for (FireHydrant hydrant : sink) {
cache.close(makeHydrantIdentifier(hydrant, hydrant.getSegment()));
}
synchronized (handoffCondition) { synchronized (handoffCondition) {
handoffCondition.notifyAll(); handoffCondition.notifyAll();
} }

View File

@ -19,9 +19,12 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
@ -48,6 +51,9 @@ public class RealtimePlumberSchool implements PlumberSchool
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexIO indexIO; private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final ObjectMapper objectMapper;
@JsonCreator @JsonCreator
public RealtimePlumberSchool( public RealtimePlumberSchool(
@ -59,8 +65,11 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject FilteredServerView serverView, @JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService, @JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger, @JacksonInject IndexMerger indexMerger,
@JacksonInject IndexIO indexIO @JacksonInject IndexIO indexIO,
) @JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject ObjectMapper objectMapper
)
{ {
this.emitter = emitter; this.emitter = emitter;
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
@ -71,6 +80,10 @@ public class RealtimePlumberSchool implements PlumberSchool
this.queryExecutorService = executorService; this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.objectMapper = objectMapper;
} }
@Override @Override
@ -94,7 +107,10 @@ public class RealtimePlumberSchool implements PlumberSchool
segmentPublisher, segmentPublisher,
serverView, serverView,
indexMerger, indexMerger,
indexIO indexIO,
cache,
cacheConfig,
objectMapper
); );
} }

View File

@ -20,6 +20,8 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
@ -44,6 +46,22 @@ import java.util.Map;
*/ */
public class FireDepartmentTest public class FireDepartmentTest
{ {
public static final CacheConfig NO_CACHE_CONFIG = new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return false;
}
@Override
public boolean isUseCache()
{
return false;
}
};
@Test @Test
public void testSerde() throws Exception public void testSerde() throws Exception
{ {
@ -87,7 +105,11 @@ public class FireDepartmentTest
null, null,
null, null,
TestHelper.getTestIndexMerger(), TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexIO() TestHelper.getTestIndexIO(),
MapCache.create(0),
NO_CACHE_CONFIG,
TestHelper.getObjectMapper()
), ),
null null
), ),

View File

@ -30,6 +30,7 @@ import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Committer; import io.druid.data.input.Committer;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
@ -50,6 +51,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireDepartmentTest;
import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
@ -193,7 +195,10 @@ public class RealtimePlumberSchoolTest
serverView, serverView,
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
TestHelper.getTestIndexMerger(), TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexIO() TestHelper.getTestIndexIO(),
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
TestHelper.getObjectMapper()
); );
metrics = new FireDepartmentMetrics(); metrics = new FireDepartmentMetrics();

View File

@ -30,7 +30,9 @@ import com.metamx.common.logger.Logger;
import io.airlift.airline.Arguments; import io.airlift.airline.Arguments;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.Binders; import io.druid.guice.Binders;
import io.druid.guice.CacheModule;
import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigProvider;
@ -156,6 +158,9 @@ public class CliPeon extends GuiceRunnable
binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class); binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class); binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
// Override the default SegmentLoaderConfig because we don't actually care about the // Override the default SegmentLoaderConfig because we don't actually care about the
// configuration based locations. This will override them anyway. This is also stopping // configuration based locations. This will override them anyway. This is also stopping
// configuration of other parameters, but I don't think that's actually a problem. // configuration of other parameters, but I don't think that's actually a problem.

View File

@ -23,6 +23,7 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer; import io.druid.cli.QueryJettyServerInitializer;
import io.druid.client.cache.CacheConfig;
import io.druid.metadata.MetadataSegmentPublisher; import io.druid.metadata.MetadataSegmentPublisher;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartment;
@ -79,6 +80,9 @@ public class RealtimeModule implements Module
.toProvider(FireDepartmentsProvider.class) .toProvider(FireDepartmentsProvider.class)
.in(LazySingleton.class); .in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class); binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);