Cache: Add maxEntrySize config, make groupBy cacheable by default. (#5108)

* Cache: Add maxEntrySize config.

The idea is this makes it more feasible to cache query types that
can potentially generate large result sets, like groupBy and select,
without fear of writing too much to the cache per query.

Includes a refactor of cache population code in CachingQueryRunner and
CachingClusteredClient, such that they now use the same CachePopulator
interface with two implementations: one for foreground and one for
background.

The main reason for splitting the foreground / background impls is
that the foreground impl can have a more effective implementation of
maxEntrySize. It can stop retaining subvalues for the cache early.

* Add CachePopulatorStats.

* Fix whitespace.

* Fix docs.

* Fix various tests.

* Add tests.

* Fix tests.

* Better tests

* Remove conflict markers.

* Fix licenses.
This commit is contained in:
Gian Merlino 2018-08-07 10:23:15 -07:00 committed by GitHub
parent 56ab4363ea
commit 3525d4059e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 827 additions and 274 deletions

View File

@ -113,8 +113,9 @@ You can optionally only configure caching to be enabled on the broker by setting
|`druid.broker.cache.useResultLevelCache`|true, false|Enable result level caching on the broker.|false|
|`druid.broker.cache.populateResultLevelCache`|true, false|Populate the result level cache on the broker.|false|
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the historicals|`Integer.MAX_VALUE`|
|`druid.broker.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.

View File

@ -97,7 +97,8 @@ You can optionally only configure caching to be enabled on the historical by set
|--------|---------------|-----------|-------|
|`druid.historical.cache.useCache`|true, false|Enable the cache on the historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the historical.|false|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["select"]|
|`druid.historical.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.

View File

@ -353,16 +353,21 @@ You can enable caching of results at the broker, historical, or realtime level u
|<code>druid.(broker&#124;historical&#124;realtime).cache.unCacheable</code>|All druid query types|All query types to not cache.|["groupBy", "select"]|
|<code>druid.(broker&#124;historical&#124;realtime).cache.useCache</code>|true, false|Whether to use cache for getting query results.|false|
|<code>druid.(broker&#124;historical&#124;realtime).cache.populateCache</code>|true, false|Whether to populate cache.|false|
|<code>druid.(broker&#124;historical&#124;realtime).cache.maxEntrySize</code>|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`-1`|
#### Local Cache
<div class="note caution">
DEPRECATED: Use caffeine instead
</div>
|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.sizeInBytes`|Maximum cache size in bytes. You must set this if you enabled populateCache/useCache, or else cache size of zero wouldn't really cache anything.|0|
|`druid.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
#### Memcache
#### Memcached
|Property|Description|Default|
|--------|-----------|-------|
@ -372,6 +377,22 @@ You can enable caching of results at the broker, historical, or realtime level u
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
#### Caffeine Cache
A highly performant local cache implementation for Druid based on [Caffeine](https://github.com/ben-manes/caffeine). Requires a JRE8u60 or higher if using `COMMON_FJP`.
Below are the configuration options known to this module:
|`runtime.properties`|Description|Default|
|--------------------|-----------|-------|
|`druid.cache.type`| Set this to `caffeine`|`local`|
|`druid.cache.sizeInBytes`|The maximum size of the cache in bytes on heap.|None (unlimited)|
|`druid.cache.expireAfter`|The time (in ms) after an access for which a cache entry may be expired|None (no time limit)|
|`druid.cache.cacheExecutorFactory`|The executor factory to use for Caffeine maintenance. One of `COMMON_FJP`, `SINGLE_THREAD`, or `SAME_THREAD`|ForkJoinPool common pool (`COMMON_FJP`)|
|`druid.cache.evictOnClose`|If a close of a namespace (ex: removing a segment from a node) should cause an eager eviction of associated cache values|`false`|
See the [Caching documentation](caching.html) for more detail.
### Indexing Service Discovery
This config is used to find the [Indexing Service](../design/indexing-service.html) using Curator service discovery. Only required if you are actually running an indexing service.

View File

@ -72,6 +72,7 @@ You can optionally configure caching to be enabled on the realtime node by setti
|--------|---------------|-----------|-------|
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["select"]`|
|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an individual cache entry (processed results for one segment), in bytes, or -1 for unlimited.|`1000000` (1MB)|
See [cache configuration](caching.html) for how to configure cache settings.

View File

@ -88,6 +88,9 @@ Available Metrics
|`*/averageByte`|Average cache entry byte size.||Varies.|
|`*/timeouts`|Number of cache timeouts.||0|
|`*/errors`|Number of cache errors.||0|
|`*/put/ok`|Number of new cache entries successfully cached.||Varies, but more than zero.|
|`*/put/error`|Number of new cache entries that could not be cached due to errors.||Varies, but more than zero.|
|`*/put/oversized`|Number of potential new cache entries that were skipped due to being too large (based on `druid.{broker,historical,realtime}.cache.maxEntrySize` properties).||Varies.|
#### Memcached only metrics

View File

@ -247,7 +247,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getCache(),
toolbox.getCacheConfig()
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()
);
}

View File

@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
@ -2147,6 +2148,7 @@ public class KafkaIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.inject.Provider;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
@ -89,6 +90,7 @@ public class TaskToolbox
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9 indexMergerV9;
private final TaskReportFileWriter taskReportFileWriter;
@ -117,6 +119,7 @@ public class TaskToolbox
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
DruidNode druidNode,
@ -144,6 +147,7 @@ public class TaskToolbox
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
@ -268,6 +272,11 @@ public class TaskToolbox
return cacheConfig;
}
public CachePopulatorStats getCachePopulatorStats()
{
return cachePopulatorStats;
}
public IndexMergerV9 getIndexMergerV9()
{
return indexMergerV9;

View File

@ -23,10 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
@ -35,6 +34,8 @@ import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.metrics.MonitorScheduler;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@ -73,6 +74,7 @@ public class TaskToolboxFactory
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9 indexMergerV9;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
@ -100,6 +102,7 @@ public class TaskToolboxFactory
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
IndexMergerV9 indexMergerV9,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
@ -126,6 +129,7 @@ public class TaskToolboxFactory
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9 = indexMergerV9;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
@ -157,6 +161,7 @@ public class TaskToolboxFactory
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
indexMergerV9,
druidNodeAnnouncer,
druidNode,

View File

@ -706,7 +706,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getCache(),
toolbox.getCacheConfig()
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats()
);
}

View File

@ -341,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats(),
toolbox.getObjectMapper()
);

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
@ -111,6 +112,7 @@ public class TaskToolboxTest
mockIndexIO,
mockCache,
mockCacheConfig,
new CachePopulatorStats(),
mockIndexMergerV9,
null,
null,

View File

@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.common.config.NullHandling;
import io.druid.data.input.Firehose;
@ -1500,6 +1501,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),

View File

@ -586,6 +586,7 @@ public class CompactionTaskTest
indexIO,
null,
null,
null,
new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()),
null,
null,

View File

@ -1514,6 +1514,7 @@ public class IndexTaskTest
indexIO,
null,
null,
null,
indexMergerV9,
null,
null,

View File

@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.common.config.NullHandling;
import io.druid.data.input.Firehose;
@ -1080,6 +1081,7 @@ public class RealtimeIndexTaskTest
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getTestIndexMergerV9(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),

View File

@ -252,6 +252,7 @@ public class SameIntervalMergeTaskTest
indexIO,
null,
null,
null,
EasyMock.createMock(IndexMergerV9.class),
null,
null,

View File

@ -316,6 +316,7 @@ public class IngestSegmentFirehoseFactoryTest
INDEX_IO,
null,
null,
null,
INDEX_MERGER_V9,
null,
null,

View File

@ -340,6 +340,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
INDEX_IO,
null,
null,
null,
INDEX_MERGER_V9,
null,
null,

View File

@ -99,6 +99,7 @@ public class SingleTaskBackgroundRunnerTest
utils.getTestIndexIO(),
null,
null,
null,
utils.getTestIndexMergerV9(),
null,
node,

View File

@ -34,6 +34,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -616,6 +617,7 @@ public class TaskLifecycleTest
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
new CachePopulatorStats(),
INDEX_MERGER_V9,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),

View File

@ -125,6 +125,7 @@ public class WorkerTaskManagerTest
indexIO,
null,
null,
null,
indexMergerV9,
null,
null,

View File

@ -187,6 +187,7 @@ public class WorkerTaskMonitorTest
indexIO,
null,
null,
null,
indexMergerV9,
null,
null,

View File

@ -19,9 +19,6 @@
package io.druid.client;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.StringUtils;
@ -31,8 +28,6 @@ import io.druid.query.QueryContexts;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class CacheUtil
@ -57,23 +52,6 @@ public class CacheUtil
);
}
public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable<Object> results)
{
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (JsonGenerator gen = mapper.getFactory().createGenerator(bytes)) {
for (Object result : results) {
gen.writeObject(result);
}
}
cache.put(key, bytes.toByteArray());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static <T> boolean useCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,

View File

@ -32,17 +32,12 @@ import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulator;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
@ -88,8 +83,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -102,8 +95,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final TimelineServerView serverView;
private final Cache cache;
private final ObjectMapper objectMapper;
private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
private final ListeningExecutorService backgroundExecutorService;
@Inject
public CachingClusteredClient(
@ -111,7 +104,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
TimelineServerView serverView,
Cache cache,
@Smile ObjectMapper objectMapper,
@BackgroundCaching ExecutorService backgroundExecutorService,
CachePopulator cachePopulator,
CacheConfig cacheConfig
)
{
@ -119,13 +112,13 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.serverView = serverView;
this.cache = cache;
this.objectMapper = objectMapper;
this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
log.warn(
"Even though groupBy caching is enabled, v2 groupBys will not be cached. "
+ "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching."
"Even though groupBy caching is enabled in your configuration, v2 groupBys will not be cached on the broker. "
+ "Consider enabling caching on your data nodes if it is not already enabled."
);
}
@ -218,7 +211,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final boolean isBySegment;
private final int uncoveredIntervalsLimit;
private final Query<T> downstreamQuery;
private final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
private final Map<String, Cache.NamedKey> cachePopulatorKeyMap = Maps.newHashMap();
SpecificQueryRunnable(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
@ -420,7 +413,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
} else if (populateCache) {
// otherwise, if populating cache, add segment to list of segments to cache
final String segmentIdentifier = segment.getServer().getSegment().getIdentifier();
addCachePopulator(segmentCacheKey, segmentIdentifier, segmentQueryInterval);
addCachePopulatorKey(segmentCacheKey, segmentIdentifier, segmentQueryInterval);
}
});
return alreadyCachedResults;
@ -453,22 +446,22 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
}
private void addCachePopulator(
private void addCachePopulatorKey(
Cache.NamedKey segmentCacheKey,
String segmentIdentifier,
Interval segmentQueryInterval
)
{
cachePopulatorMap.put(
cachePopulatorKeyMap.put(
StringUtils.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cache, objectMapper, segmentCacheKey)
segmentCacheKey
);
}
@Nullable
private CachePopulator getCachePopulator(String segmentId, Interval segmentInterval)
private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentInterval)
{
return cachePopulatorMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval));
}
private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<ServerToSegment> segments)
@ -601,27 +594,19 @@ public class CachingClusteredClient implements QuerySegmentWalker
responseContext
);
final Function<T, Object> cacheFn = strategy.prepareForSegmentLevelCache();
return resultsBySegments
.map(result -> {
final BySegmentResultValueClass<T> resultsOfSegment = result.getValue();
final CachePopulator cachePopulator =
getCachePopulator(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
Sequence<T> res = Sequences
.simple(resultsOfSegment.getResults())
.map(r -> {
if (cachePopulator != null) {
// only compute cache data if populating cache
cachePopulator.cacheFutures.add(backgroundExecutorService.submit(() -> cacheFn.apply(r)));
}
return r;
})
.map(
toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply
);
if (cachePopulator != null) {
res = res.withEffect(cachePopulator::populate, MoreExecutors.sameThreadExecutor());
final Cache.NamedKey cachePopulatorKey =
getCachePopulatorKey(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
Sequence<T> res = Sequences.simple(resultsOfSegment.getResults());
if (cachePopulatorKey != null) {
res = cachePopulator.wrap(res, cacheFn::apply, cache, cachePopulatorKey);
}
return res;
return res.map(
toolChest.makePreComputeManipulatorFn(downstreamQuery, MetricManipulatorFns.deserializing())::apply
);
})
.flatMerge(seq -> seq, query.getResultOrdering());
}
@ -644,43 +629,4 @@ public class CachingClusteredClient implements QuerySegmentWalker
return rhs;
}
}
private class CachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue<>();
CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
{
this.cache = cache;
this.mapper = mapper;
this.key = key;
}
public void populate()
{
Futures.addCallback(
Futures.allAsList(cacheFutures),
new FutureCallback<List<Object>>()
{
@Override
public void onSuccess(List<Object> cacheData)
{
CacheUtil.populate(cache, mapper, key, cacheData);
// Help out GC by making sure all references are gone
cacheFutures.clear();
}
@Override
public void onFailure(Throwable throwable)
{
log.error(throwable, "Background caching failed");
}
},
backgroundExecutorService
);
}
}
}

View File

@ -23,18 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulator;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
@ -46,20 +41,19 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class CachingQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(CachingQueryRunner.class);
private final String segmentIdentifier;
private final SegmentDescriptor segmentDescriptor;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
private final Cache cache;
private final ObjectMapper mapper;
private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
private final ListeningExecutorService backgroundExecutorService;
public CachingQueryRunner(
String segmentIdentifier,
@ -68,7 +62,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
Cache cache,
QueryToolChest toolchest,
QueryRunner<T> base,
ExecutorService backgroundExecutorService,
CachePopulator cachePopulator,
CacheConfig cacheConfig
)
{
@ -78,7 +72,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
}
@ -140,56 +134,10 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
}
}
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.newLinkedList());
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(new LinkedList<>());
if (populateCache) {
final Function cacheFn = strategy.prepareForSegmentLevelCache();
return Sequences.withEffect(
Sequences.map(
base.run(queryPlus, responseContext),
new Function<T, T>()
{
@Override
public T apply(final T input)
{
final SettableFuture<Object> future = SettableFuture.create();
cacheFutures.add(future);
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
future.set(cacheFn.apply(input));
}
catch (Exception e) {
// if there is exception, should setException to quit the caching processing
future.setException(e);
}
}
}
);
return input;
}
}
),
new Runnable()
{
@Override
public void run()
{
try {
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
}
catch (Exception e) {
log.error(e, "Error while getting future for cache task");
throw Throwables.propagate(e);
}
}
},
backgroundExecutorService
);
return cachePopulator.wrap(base.run(queryPlus, responseContext), value -> cacheFn.apply(value), cache, key);
} else {
return base.run(queryPlus, responseContext);
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
public class BackgroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(BackgroundCachePopulator.class);
private final ListeningExecutorService exec;
private final ObjectMapper objectMapper;
private final CachePopulatorStats cachePopulatorStats;
private final long maxEntrySize;
public BackgroundCachePopulator(
final ExecutorService exec,
final ObjectMapper objectMapper,
final CachePopulatorStats cachePopulatorStats,
final long maxEntrySize
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
this.maxEntrySize = maxEntrySize;
}
@Override
public <T, CacheType> Sequence<T> wrap(
final Sequence<T> sequence,
final Function<T, CacheType> cacheFn,
final Cache cache,
final Cache.NamedKey cacheKey
)
{
final List<ListenableFuture<CacheType>> cacheFutures = new LinkedList<>();
final Sequence<T> wrappedSequence = Sequences.map(
sequence,
input -> {
cacheFutures.add(exec.submit(() -> cacheFn.apply(input)));
return input;
}
);
return Sequences.withEffect(
wrappedSequence,
() -> {
Futures.addCallback(
Futures.allAsList(cacheFutures),
new FutureCallback<List<CacheType>>()
{
@Override
public void onSuccess(List<CacheType> results)
{
populateCache(cache, cacheKey, results);
// Help out GC by making sure all references are gone
cacheFutures.clear();
}
@Override
public void onFailure(Throwable t)
{
log.error(t, "Background caching failed");
}
},
exec
);
},
MoreExecutors.sameThreadExecutor()
);
}
private <CacheType> void populateCache(
final Cache cache,
final Cache.NamedKey cacheKey,
final List<CacheType> results
)
{
try {
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) {
for (CacheType result : results) {
gen.writeObject(result);
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
cachePopulatorStats.incrementOversized();
return;
}
}
}
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
cachePopulatorStats.incrementOversized();
return;
}
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Could not populate cache");
cachePopulatorStats.incrementError();
}
}
}

View File

@ -20,10 +20,10 @@
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.query.Query;
import javax.validation.constraints.Min;
import java.util.Arrays;
import java.util.List;
public class CacheConfig
@ -52,10 +52,13 @@ public class CacheConfig
private int cacheBulkMergeLimit = Integer.MAX_VALUE;
@JsonProperty
private int resultLevelCacheLimit = Integer.MAX_VALUE;
private int maxEntrySize = 1_000_000;
@JsonProperty
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
private List<String> unCacheable = ImmutableList.of(Query.SELECT);
@JsonProperty
private int resultLevelCacheLimit = Integer.MAX_VALUE;
public boolean isPopulateCache()
{
@ -87,6 +90,11 @@ public class CacheConfig
return cacheBulkMergeLimit;
}
public int getMaxEntrySize()
{
return maxEntrySize;
}
public int getResultLevelCacheLimit()
{
return resultLevelCacheLimit;

View File

@ -20,27 +20,24 @@
package io.druid.client.cache;
import com.google.inject.Inject;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.metrics.AbstractMonitor;
import io.druid.java.util.common.StringUtils;
public class CacheMonitor extends AbstractMonitor
{
// package private for tests
volatile Cache cache;
private final CachePopulatorStats cachePopulatorStats;
private volatile CacheStats prevCacheStats = null;
private volatile CachePopulatorStats.Snapshot prevCachePopulatorStats = null;
public CacheMonitor()
@Inject
public CacheMonitor(final CachePopulatorStats cachePopulatorStats)
{
}
public CacheMonitor(
Cache cache
)
{
this.cache = cache;
this.cachePopulatorStats = cachePopulatorStats;
}
// make it possible to enable CacheMonitor even if cache is not bound
@ -58,10 +55,16 @@ public class CacheMonitor extends AbstractMonitor
final CacheStats currCacheStats = cache.getStats();
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
emitStats(emitter, "query/cache/delta", deltaCacheStats, builder);
emitStats(emitter, "query/cache/total", currCacheStats, builder);
final CachePopulatorStats.Snapshot currCachePopulatorStats = cachePopulatorStats.snapshot();
final CachePopulatorStats.Snapshot deltaCachePopulatorStats = currCachePopulatorStats.delta(
prevCachePopulatorStats
);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
emitStats(emitter, "query/cache/delta", deltaCachePopulatorStats, deltaCacheStats, builder);
emitStats(emitter, "query/cache/total", currCachePopulatorStats, currCacheStats, builder);
prevCachePopulatorStats = currCachePopulatorStats;
prevCacheStats = currCacheStats;
// Any custom cache statistics that need monitoring
@ -71,13 +74,15 @@ public class CacheMonitor extends AbstractMonitor
}
private void emitStats(
ServiceEmitter emitter,
final ServiceEmitter emitter,
final String metricPrefix,
CacheStats cacheStats,
ServiceMetricEvent.Builder builder
final CachePopulatorStats.Snapshot cachePopulatorStats,
final CacheStats cacheStats,
final ServiceMetricEvent.Builder builder
)
{
if (cache != null) {
// Cache stats.
emitter.emit(builder.build(StringUtils.format("%s/numEntries", metricPrefix), cacheStats.getNumEntries()));
emitter.emit(builder.build(StringUtils.format("%s/sizeBytes", metricPrefix), cacheStats.getSizeInBytes()));
emitter.emit(builder.build(StringUtils.format("%s/hits", metricPrefix), cacheStats.getNumHits()));
@ -87,6 +92,13 @@ public class CacheMonitor extends AbstractMonitor
emitter.emit(builder.build(StringUtils.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
emitter.emit(builder.build(StringUtils.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
emitter.emit(builder.build(StringUtils.format("%s/errors", metricPrefix), cacheStats.getNumErrors()));
// Cache populator stats.
emitter.emit(builder.build(StringUtils.format("%s/put/ok", metricPrefix), cachePopulatorStats.getNumOk()));
emitter.emit(builder.build(StringUtils.format("%s/put/error", metricPrefix), cachePopulatorStats.getNumError()));
emitter.emit(
builder.build(StringUtils.format("%s/put/oversized", metricPrefix), cachePopulatorStats.getNumOversized())
);
}
}
}

View File

@ -17,21 +17,18 @@
* under the License.
*/
package io.druid.guice.annotations;
package io.druid.client.cache;
import com.google.inject.BindingAnnotation;
import io.druid.java.util.common.guava.Sequence;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.function.Function;
/**
*
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface BackgroundCaching
public interface CachePopulator
{
<T, CacheType> Sequence<T> wrap(
Sequence<T> sequence,
Function<T, CacheType> cacheFn,
Cache cache,
Cache.NamedKey cacheKey
);
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class CachePopulatorStats
{
private final AtomicLong okCounter = new AtomicLong();
private final AtomicLong errorCounter = new AtomicLong();
private final AtomicLong oversizedCounter = new AtomicLong();
public void incrementOk()
{
okCounter.incrementAndGet();
}
public void incrementError()
{
errorCounter.incrementAndGet();
}
public void incrementOversized()
{
oversizedCounter.incrementAndGet();
}
public Snapshot snapshot()
{
return new Snapshot(
okCounter.get(),
errorCounter.get(),
oversizedCounter.get()
);
}
public static class Snapshot
{
private final long numOk;
private final long numError;
private final long numOversized;
Snapshot(final long numOk, final long numError, final long numOversized)
{
this.numOk = numOk;
this.numError = numError;
this.numOversized = numOversized;
}
public long getNumOk()
{
return numOk;
}
public long getNumError()
{
return numError;
}
public long getNumOversized()
{
return numOversized;
}
public Snapshot delta(Snapshot oldSnapshot)
{
if (oldSnapshot == null) {
return this;
} else {
return new Snapshot(
numOk - oldSnapshot.numOk,
numError - oldSnapshot.numError,
numOversized - oldSnapshot.numOversized
);
}
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
public class ForegroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(ForegroundCachePopulator.class);
private final Object lock = new Object();
private final ObjectMapper objectMapper;
private final CachePopulatorStats cachePopulatorStats;
private final long maxEntrySize;
public ForegroundCachePopulator(
final ObjectMapper objectMapper,
final CachePopulatorStats cachePopulatorStats,
final long maxEntrySize
)
{
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
this.maxEntrySize = maxEntrySize;
}
@Override
public <T, CacheType> Sequence<T> wrap(
final Sequence<T> sequence,
final Function<T, CacheType> cacheFn,
final Cache cache,
final Cache.NamedKey cacheKey
)
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final AtomicBoolean tooBig = new AtomicBoolean(false);
final JsonGenerator jsonGenerator;
try {
jsonGenerator = objectMapper.getFactory().createGenerator(bytes);
}
catch (IOException e) {
throw new RuntimeException(e);
}
return Sequences.wrap(
Sequences.map(
sequence,
input -> {
if (!tooBig.get()) {
synchronized (lock) {
try {
jsonGenerator.writeObject(cacheFn.apply(input));
// Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste cycles flushing.
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
tooBig.set(true);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
return input;
}
),
new SequenceWrapper()
{
@Override
public void after(final boolean isDone, final Throwable thrown) throws Exception
{
synchronized (lock) {
jsonGenerator.close();
if (isDone) {
// Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
cachePopulatorStats.incrementOversized();
return;
}
try {
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Unable to write to cache");
cachePopulatorStats.incrementError();
}
}
}
}
}
);
}
}

View File

@ -21,8 +21,8 @@ package io.druid.client.cache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
import java.util.Collections;

View File

@ -24,6 +24,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.CacheProvider;
import io.druid.guice.annotations.Global;
@ -48,6 +49,7 @@ public class CacheModule implements Module
public void configure(Binder binder)
{
binder.bind(Cache.class).toProvider(Key.get(CacheProvider.class, Global.class)).in(ManageLifecycle.class);
binder.bind(CachePopulatorStats.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, prefix, CacheProvider.class, Global.class);
binder.install(new HybridCacheModule(prefix));

View File

@ -19,22 +19,26 @@
package io.druid.guice;
import com.google.common.util.concurrent.MoreExecutors;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.collections.BlockingPool;
import io.druid.collections.DefaultBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.StupidPool;
import io.druid.common.utils.VMUtils;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
import io.druid.java.util.common.lifecycle.Lifecycle;
@ -64,14 +68,15 @@ public class DruidProcessingModule implements Module
}
@Provides
@BackgroundCaching
@LazySingleton
public ExecutorService getBackgroundExecutorService(
public CachePopulator getCachePopulator(
@Smile ObjectMapper smileMapper,
CachePopulatorStats cachePopulatorStats,
CacheConfig cacheConfig
)
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
return Executors.newFixedThreadPool(
final ExecutorService exec = Executors.newFixedThreadPool(
cacheConfig.getNumBackgroundThreads(),
new ThreadFactoryBuilder()
.setNameFormat("background-cacher-%d")
@ -79,8 +84,10 @@ public class DruidProcessingModule implements Module
.setPriority(Thread.MIN_PRIORITY)
.build()
);
return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
} else {
return MoreExecutors.sameThreadExecutor();
return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
}
}

View File

@ -22,16 +22,14 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import io.druid.client.cache.CacheConfig;
import io.druid.collections.BlockingPool;
import io.druid.collections.DummyBlockingPool;
import io.druid.collections.DummyNonBlockingPool;
import io.druid.collections.NonBlockingPool;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Merging;
import io.druid.guice.annotations.Processing;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ExecutorServiceConfig;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidProcessingConfig;
@ -58,20 +56,6 @@ public class RouterProcessingModule implements Module
MetricsModule.register(binder, ExecutorServiceMonitor.class);
}
@Provides
@BackgroundCaching
@LazySingleton
public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig)
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
log.error(
"numBackgroundThreads[%d] configured, that is ignored on Router",
cacheConfig.getNumBackgroundThreads()
);
}
return Execs.dummy();
}
@Provides
@Processing
@ManageLifecycle

View File

@ -40,6 +40,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
@ -166,7 +167,8 @@ public class AppenderatorImpl implements Appenderator
IndexIO indexIO,
IndexMerger indexMerger,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this.schema = Preconditions.checkNotNull(schema, "schema");
@ -186,7 +188,8 @@ public class AppenderatorImpl implements Appenderator
conglomerate,
queryExecutorService,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig
cacheConfig,
cachePopulatorStats
);
maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -52,7 +53,8 @@ public class Appenderators
ServiceEmitter emitter,
ExecutorService queryExecutorService,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
return new AppenderatorImpl(
@ -68,7 +70,8 @@ public class Appenderators
indexIO,
indexMerger,
cache,
cacheConfig
cacheConfig,
cachePopulatorStats
);
}
@ -120,6 +123,7 @@ public class Appenderators
indexIO,
indexMerger,
null,
null,
null
);
}

View File

@ -21,11 +21,11 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
@ -51,6 +51,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
private final IndexMerger indexMerger;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
public DefaultRealtimeAppenderatorFactory(
@JacksonInject ServiceEmitter emitter,
@ -62,7 +63,8 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
@JacksonInject IndexIO indexIO,
@JacksonInject IndexMerger indexMerger,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig
@JacksonInject CacheConfig cacheConfig,
@JacksonInject CachePopulatorStats cachePopulatorStats
)
{
this.emitter = emitter;
@ -75,6 +77,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
this.indexMerger = indexMerger;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
}
@Override
@ -103,7 +106,8 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
emitter,
queryExecutorService,
cache,
cacheConfig
cacheConfig,
cachePopulatorStats
);
}

View File

@ -24,15 +24,17 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
@ -76,6 +78,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final ExecutorService queryExecutorService;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
public SinkQuerySegmentWalker(
String dataSource,
@ -85,7 +88,8 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@ -96,6 +100,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
if (!cache.isLocal()) {
log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
@ -235,7 +240,12 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
cache,
toolChest,
baseRunner,
MoreExecutors.sameThreadExecutor(),
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
} else {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
@ -73,6 +74,7 @@ public class FlushingPlumber extends RealtimePlumber
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
ObjectMapper objectMapper
)
@ -92,6 +94,7 @@ public class FlushingPlumber extends RealtimePlumber
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);

View File

@ -24,10 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@ -57,6 +58,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ObjectMapper objectMapper;
@JsonCreator
@ -70,6 +72,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject CachePopulatorStats cachePopulatorStats,
@JacksonInject ObjectMapper objectMapper
)
{
@ -85,6 +88,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);
@ -97,6 +101,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.objectMapper = objectMapper;
}
@ -122,6 +127,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);
}

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
@ -146,6 +147,7 @@ public class RealtimePlumber implements Plumber
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
ObjectMapper objectMapper
)
{
@ -168,7 +170,8 @@ public class RealtimePlumber implements Plumber
conglomerate,
queryExecutorService,
cache,
cacheConfig
cacheConfig,
cachePopulatorStats
);
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());

View File

@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.guice.annotations.Processing;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@ -54,6 +55,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ObjectMapper objectMapper;
@JsonCreator
@ -69,6 +71,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
@JacksonInject CachePopulatorStats cachePopulatorStats,
@JacksonInject ObjectMapper objectMapper
)
{
@ -84,6 +87,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.objectMapper = objectMapper;
}
@ -111,6 +115,7 @@ public class RealtimePlumberSchool implements PlumberSchool
indexIO,
cache,
cacheConfig,
cachePopulatorStats,
objectMapper
);
}

View File

@ -26,7 +26,7 @@ import com.google.inject.Inject;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.client.cache.CachePopulator;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
@ -77,7 +77,7 @@ public class ServerManager implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final ExecutorService cachingExec;
private final CachePopulator cachePopulator;
private final Cache cache;
private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig;
@ -89,7 +89,7 @@ public class ServerManager implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
@BackgroundCaching ExecutorService cachingExec,
CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig,
@ -101,7 +101,7 @@ public class ServerManager implements QuerySegmentWalker
this.emitter = emitter;
this.exec = exec;
this.cachingExec = cachingExec;
this.cachePopulator = cachePopulator;
this.cache = cache;
this.objectMapper = objectMapper;
@ -298,7 +298,7 @@ public class ServerManager implements QuerySegmentWalker
cache,
toolChest,
metricsEmittingQueryRunnerInner,
cachingExec,
cachePopulator,
cacheConfig
);

View File

@ -21,10 +21,11 @@ package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
@ -74,7 +75,9 @@ public class CachingClusteredClientFunctionalityTest
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
client = makeClient(MoreExecutors.sameThreadExecutor());
client = makeClient(
new ForegroundCachePopulator(CachingClusteredClientTest.jsonMapper, new CachePopulatorStats(), -1)
);
}
@Test
@ -199,13 +202,13 @@ public class CachingClusteredClientFunctionalityTest
));
}
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
protected CachingClusteredClient makeClient(final CachePopulator cachePopulator)
{
return makeClient(backgroundExecutorService, cache, 10);
return makeClient(cachePopulator, cache, 10);
}
protected CachingClusteredClient makeClient(
final ListeningExecutorService backgroundExecutorService,
final CachePopulator cachePopulator,
final Cache cache,
final int mergeLimit
)
@ -245,7 +248,7 @@ public class CachingClusteredClientFunctionalityTest
},
cache,
CachingClusteredClientTest.jsonMapper,
backgroundExecutorService,
cachePopulator,
new CacheConfig()
{
@Override

View File

@ -43,8 +43,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
@ -330,7 +334,7 @@ public class CachingClusteredClientTest
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
serverView = EasyMock.createNiceMock(TimelineServerView.class);
cache = MapCache.create(100000);
client = makeClient(MoreExecutors.sameThreadExecutor());
client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1));
servers = new DruidServer[]{
new DruidServer("test1", "test1", null, 10, ServerType.HISTORICAL, "bye", 0),
@ -422,7 +426,14 @@ public class CachingClusteredClientTest
}
};
client = makeClient(randomizingExecutorService);
client = makeClient(
new BackgroundCachePopulator(
randomizingExecutorService,
jsonMapper,
new CachePopulatorStats(),
-1
)
);
// callback to be run every time a query run is complete, to ensure all background
// caching tasks are executed, and cache is populated before we move onto the next query
@ -579,7 +590,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, limit);
final DruidServer lastServer = servers[random.nextInt(servers.length)];
final DataSegment dataSegment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(dataSegment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
@ -604,7 +615,7 @@ public class CachingClusteredClientTest
.andReturn(ImmutableMap.of())
.once();
EasyMock.replay(cache);
client = makeClient(MoreExecutors.sameThreadExecutor(), cache, 0);
client = makeClient(new ForegroundCachePopulator(jsonMapper, new CachePopulatorStats(), -1), cache, 0);
getDefaultQueryRunner().run(QueryPlus.wrap(query), context);
EasyMock.verify(cache);
EasyMock.verify(dataSegment);
@ -2630,13 +2641,13 @@ public class CachingClusteredClientTest
EasyMock.reset(mocks);
}
protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService)
protected CachingClusteredClient makeClient(final CachePopulator cachePopulator)
{
return makeClient(backgroundExecutorService, cache, 10);
return makeClient(cachePopulator, cache, 10);
}
protected CachingClusteredClient makeClient(
final ListeningExecutorService backgroundExecutorService,
final CachePopulator cachePopulator,
final Cache cache,
final int mergeLimit
)
@ -2676,7 +2687,7 @@ public class CachingClusteredClientTest
},
cache,
jsonMapper,
backgroundExecutorService,
cachePopulator,
new CacheConfig()
{
@Override

View File

@ -19,20 +19,26 @@
package io.druid.client;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.BackgroundCachePopulator;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulator;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.CacheStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.MapCache;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
@ -63,15 +69,15 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -98,14 +104,22 @@ public class CachingQueryRunnerTest
DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};
private ExecutorService backgroundExecutorService;
private ObjectMapper objectMapper;
private CachePopulator cachePopulator;
public CachingQueryRunnerTest(int numBackgroundThreads)
{
objectMapper = new DefaultObjectMapper();
if (numBackgroundThreads > 0) {
backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads);
cachePopulator = new BackgroundCachePopulator(
Execs.multiThreaded(numBackgroundThreads, "CachingQueryRunnerTest-%d"),
objectMapper,
new CachePopulatorStats(),
-1
);
} else {
backgroundExecutorService = MoreExecutors.sameThreadExecutor();
cachePopulator = new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), -1);
}
}
@ -274,7 +288,7 @@ public class CachingQueryRunnerTest
return resultSeq;
}
},
backgroundExecutorService,
cachePopulator,
new CacheConfig()
{
@Override
@ -331,7 +345,7 @@ public class CachingQueryRunnerTest
List<Result> expectedResults,
Query query,
QueryToolChest toolchest
)
) throws IOException
{
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
String segmentIdentifier = "segment";
@ -345,12 +359,7 @@ public class CachingQueryRunnerTest
);
Cache cache = MapCache.create(1024 * 1024);
CacheUtil.populate(
cache,
objectMapper,
cacheKey,
Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache())
);
cache.put(cacheKey, toByteArray(Iterables.transform(expectedResults, cacheStrategy.prepareForSegmentLevelCache())));
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
@ -367,7 +376,7 @@ public class CachingQueryRunnerTest
return Sequences.empty();
}
},
backgroundExecutorService,
cachePopulator,
new CacheConfig()
{
@Override
@ -434,6 +443,19 @@ public class CachingQueryRunnerTest
return retVal;
}
private <T> byte[] toByteArray(final Iterable<T> results) throws IOException
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes)) {
for (T result : results) {
gen.writeObject(result);
}
}
return bytes.toByteArray();
}
private static class AssertingClosable implements Closeable
{

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.jackson.JacksonUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class CachePopulatorTest
{
private final ExecutorService exec = Execs.multiThreaded(2, "cache-populator-test-%d");
private final ObjectMapper objectMapper = new ObjectMapper();
private final Cache cache = new MapCache(new ByteCountingLRUMap(Long.MAX_VALUE));
private final CachePopulatorStats stats = new CachePopulatorStats();
@After
public void tearDown()
{
exec.shutdownNow();
}
@Test
public void testForegroundPopulator()
{
final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, -1);
final List<String> strings = ImmutableList.of("foo", "bar");
Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
Assert.assertEquals(strings, readFromCache(makeKey(1)));
Assert.assertEquals(1, stats.snapshot().getNumOk());
Assert.assertEquals(0, stats.snapshot().getNumError());
Assert.assertEquals(0, stats.snapshot().getNumOversized());
}
@Test
public void testForegroundPopulatorMaxEntrySize()
{
final CachePopulator populator = new ForegroundCachePopulator(objectMapper, stats, 30);
final List<String> strings = ImmutableList.of("foo", "bar");
final List<String> strings2 = ImmutableList.of("foo", "baralararararararaarararararaa");
Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
Assert.assertEquals(strings, readFromCache(makeKey(1)));
Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2));
Assert.assertNull(readFromCache(makeKey(2)));
Assert.assertEquals(1, stats.snapshot().getNumOk());
Assert.assertEquals(0, stats.snapshot().getNumError());
Assert.assertEquals(1, stats.snapshot().getNumOversized());
}
@Test(timeout = 60000L)
public void testBackgroundPopulator() throws InterruptedException
{
final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, -1);
final List<String> strings = ImmutableList.of("foo", "bar");
Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
// Wait for background updates to happen.
while (cache.getStats().getNumEntries() < 1) {
Thread.sleep(100);
}
Assert.assertEquals(strings, readFromCache(makeKey(1)));
Assert.assertEquals(1, stats.snapshot().getNumOk());
Assert.assertEquals(0, stats.snapshot().getNumError());
Assert.assertEquals(0, stats.snapshot().getNumOversized());
}
@Test(timeout = 60000L)
public void testBackgroundPopulatorMaxEntrySize() throws InterruptedException
{
final CachePopulator populator = new BackgroundCachePopulator(exec, objectMapper, stats, 30);
final List<String> strings = ImmutableList.of("foo", "bar");
final List<String> strings2 = ImmutableList.of("foo", "baralararararararaarararararaa");
Assert.assertEquals(strings, wrapAndReturn(populator, makeKey(1), strings));
Assert.assertEquals(strings2, wrapAndReturn(populator, makeKey(2), strings2));
// Wait for background updates to happen.
while (cache.getStats().getNumEntries() < 1 || stats.snapshot().getNumOversized() < 1) {
Thread.sleep(100);
}
Assert.assertEquals(strings, readFromCache(makeKey(1)));
Assert.assertNull(readFromCache(makeKey(2)));
Assert.assertEquals(1, stats.snapshot().getNumOk());
Assert.assertEquals(0, stats.snapshot().getNumError());
Assert.assertEquals(1, stats.snapshot().getNumOversized());
}
private static Cache.NamedKey makeKey(final int n)
{
return new Cache.NamedKey("test", Ints.toByteArray(n));
}
private List<String> wrapAndReturn(
final CachePopulator populator,
final Cache.NamedKey key,
final List<String> strings
)
{
return populator.wrap(Sequences.simple(strings), s -> ImmutableMap.of("s", s), cache, key).toList();
}
private List<String> readFromCache(final Cache.NamedKey key)
{
final byte[] bytes = cache.get(key);
if (bytes == null) {
return null;
}
try (
final MappingIterator<Map<String, String>> iterator = objectMapper.readValues(
objectMapper.getFactory().createParser(bytes),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING
)
) {
final List<Map<String, String>> retVal = new ArrayList<>();
Iterators.addAll(retVal, iterator);
// Undo map-wrapping that was done in wrapAndReturn.
return retVal.stream().map(m -> m.get("s")).collect(Collectors.toList());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@ -115,6 +116,7 @@ public class FireDepartmentTest
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
MapCache.create(0),
NO_CACHE_CONFIG,
new CachePopulatorStats(),
TestHelper.makeJsonMapper()
),

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@ -273,7 +274,8 @@ public class AppenderatorTester implements AutoCloseable
emitter,
queryExecutor,
MapCache.create(2048),
new CacheConfig()
new CacheConfig(),
new CachePopulatorStats()
);
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
@ -223,6 +224,7 @@ public class RealtimePlumberSchoolTest
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG,
new CachePopulatorStats(),
TestHelper.makeJsonMapper()
);

View File

@ -26,8 +26,9 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CachePopulatorStats;
import io.druid.client.cache.ForegroundCachePopulator;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
@ -153,7 +154,7 @@ public class ServerManagerTest
},
new NoopServiceEmitter(),
serverManagerExec,
MoreExecutors.sameThreadExecutor(),
new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
new CacheConfig(),