mirror of https://github.com/apache/druid.git
General Caching Query Runners cleanup
* Add type strictness to CachingClusteredClient. * Add background caching to CachingClusteredClient. Gives between 0% and 5% query speed increase. * Add @BackgroundCaching annotation for injected ExecutorService items * Add `numBackgroundThreads' configuration options to CacheConfig (default 0 aka same thread legacy behavior) * Add unit tests for CacheConfig * Add an abstract caching query runner class, currently it doesn't do anything exceppt simply make the two caching queries distinct. * Add caching to CachingQueryRunner. Gives up to a WHOPPING 40% reduction in query time on HLL queries * Updated docs with more info on cache settings.
This commit is contained in:
parent
187126dd4b
commit
7b65f0635d
|
@ -14,6 +14,30 @@ There are three JVM parameters that we set on all of our processes:
|
|||
2. `-Dfile.encoding=UTF-8` This is similar to timezone, we test assuming UTF-8. Local encodings might work, but they also might result in weird and interesting bugs.
|
||||
3. `-Djava.io.tmpdir=<a path>` Various parts of the system that interact with the file system do it via temporary files, and these files can get somewhat large. Many production systems are set up to have small (but fast) `/tmp` directories, which can be problematic with Druid so we recommend pointing the JVM’s tmp directory to something with a little more meat.
|
||||
|
||||
## CLI specific
|
||||
|
||||
Some of the CLI options have configurations that only apply in that particular CLI functionality
|
||||
|
||||
### Server Historical
|
||||
|
||||
|Property|Description|Allowed Values|Default|
|
||||
|--------|-----------|--------------|-------|
|
||||
|`druid.cache.type`|Set the caching type.|`local`, `memcached`|`local`|
|
||||
|`druid.historical.cache.useCache`|Allow cache to be used. Cache will NOT be used unless this is set.|`true`,`false`|`false`|
|
||||
|`druid.historical.cache.populateCache`|Allow cache to be populated. Cache will NOT be populated unless this is set.|`true`,`false`|`false`|
|
||||
|`druid.historical.cache.unCacheable`|Do not attempt to cache queries whose types are in this array|array of valid values for queryType|`["groupBy","select"]`|
|
||||
|`druid.historical.cache.numBackgroundThreads`|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |Non-negative integer|`0`|
|
||||
|
||||
### Server Broker
|
||||
|
||||
|Property|Description|Allowed Values|Default|
|
||||
|--------|-----------|--------------|-------|
|
||||
|`druid.cache.type`|Set the caching type if caching is enabled.| `local`, `memcached`|`local`|
|
||||
|`druid.broker.cache.useCache`|Allow cache to be used. Cache will NOT be used unless this is set.|`true`,`false`|`false`|
|
||||
|`druid.broker.cache.populateCache`|Allow cache to be populated. Cache will NOT be populated unless this is set.|`true`,`false`|`false`|
|
||||
|`druid.broker.cache.unCacheable`|Do not attempt to cache queries whose types are in this array|array of valid values for queryType|`["groupBy","select"]`|
|
||||
|`druid.broker.cache.numBackgroundThreads`|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |Non-negative integer|`0`|
|
||||
|
||||
## Modules
|
||||
|
||||
As of Druid v0.6, most core Druid functionality has been compartmentalized into modules. There are a set of default modules that may apply to any node type, and there are specific modules for the different node types. Default modules are __lazily instantiated__. Each module has its own set of configuration.
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice.annotations;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface BackgroundCaching
|
||||
{
|
||||
}
|
|
@ -31,6 +31,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
|
|||
|
||||
public TypeReference<CacheType> getCacheObjectClazz();
|
||||
|
||||
// Resultant function must be THREAD SAFE
|
||||
public Function<T, CacheType> prepareForCache();
|
||||
|
||||
public Function<CacheType, T> pullFromCache();
|
||||
|
|
|
@ -30,6 +30,9 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -44,6 +47,7 @@ import io.druid.client.cache.Cache;
|
|||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.selector.QueryableDruidServer;
|
||||
import io.druid.client.selector.ServerSelector;
|
||||
import io.druid.guice.annotations.BackgroundCaching;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.BySegmentResultValueClass;
|
||||
import io.druid.query.CacheStrategy;
|
||||
|
@ -64,11 +68,15 @@ import org.joda.time.Interval;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
|
@ -81,6 +89,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
private final Cache cache;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
private final ListeningExecutorService backgroundExecutorService;
|
||||
|
||||
@Inject
|
||||
public CachingClusteredClient(
|
||||
|
@ -88,6 +97,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
TimelineServerView serverView,
|
||||
Cache cache,
|
||||
@Smile ObjectMapper objectMapper,
|
||||
@BackgroundCaching ExecutorService backgroundExecutorService,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
|
@ -96,6 +106,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
this.cacheConfig = cacheConfig;
|
||||
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
|
||||
|
||||
serverView.registerSegmentCallback(
|
||||
Executors.newFixedThreadPool(
|
||||
|
@ -135,7 +146,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final boolean isBySegment = query.getContextBySegment(false);
|
||||
|
||||
|
||||
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
|
||||
final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
|
||||
|
||||
final int priority = query.getContextPriority(0);
|
||||
contextBuilder.put("priority", priority);
|
||||
|
@ -146,9 +157,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
}
|
||||
contextBuilder.put("intermediate", true);
|
||||
|
||||
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
|
||||
|
||||
|
||||
VersionedIntervalTimeline<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
|
||||
if (timeline == null) {
|
||||
return Sequences.empty();
|
||||
|
@ -159,7 +167,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
|
||||
|
||||
for (Interval interval : rewrittenQuery.getIntervals()) {
|
||||
for (Interval interval : query.getIntervals()) {
|
||||
serversLookup.addAll(timeline.lookup(interval));
|
||||
}
|
||||
|
||||
|
@ -298,9 +306,16 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void addSequencesFromServer(ArrayList<Pair<Interval, Sequence<T>>> listOfSequences)
|
||||
{
|
||||
listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size());
|
||||
|
||||
final Query<Result<BySegmentResultValueClass<T>>> rewrittenQuery = (Query<Result<BySegmentResultValueClass<T>>>) query
|
||||
.withOverriddenContext(contextBuilder.build());
|
||||
|
||||
final Queue<ListenableFuture<?>> cacheFutures = new ConcurrentLinkedQueue<>();
|
||||
// Loop through each server, setting up the query and initiating it.
|
||||
// The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter.
|
||||
for (Map.Entry<DruidServer, List<SegmentDescriptor>> entry : serverSegments.entrySet()) {
|
||||
final DruidServer server = entry.getKey();
|
||||
final List<SegmentDescriptor> descriptors = entry.getValue();
|
||||
|
@ -315,42 +330,61 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
|
||||
List<Interval> intervals = segmentSpec.getIntervals();
|
||||
|
||||
if (!server.isAssignable() || !populateCache || isBySegment) {
|
||||
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
|
||||
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
|
||||
} else {
|
||||
// this could be more efficient, since we only need to reorder results
|
||||
// for batches of segments with the same segment start time.
|
||||
|
||||
} else { // Requires some manipulation on broker side
|
||||
|
||||
final QueryRunner<Result<BySegmentResultValueClass<T>>> clientQueryableWithSegments = clientQueryable;
|
||||
|
||||
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryableWithSegments.run(
|
||||
rewrittenQuery.withQuerySegmentSpec(segmentSpec),
|
||||
responseContext
|
||||
);
|
||||
resultSeqToAdd = toolChest.mergeSequencesUnordered(
|
||||
Sequences.map(
|
||||
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext),
|
||||
new Function<Object, Sequence<T>>()
|
||||
Sequences.<Result<BySegmentResultValueClass<T>>, Sequence<T>>map(
|
||||
runningSequence,
|
||||
new Function<Result<BySegmentResultValueClass<T>>, Sequence<T>>()
|
||||
{
|
||||
private final Function<T, Object> cacheFn = strategy.prepareForCache();
|
||||
|
||||
// Acctually do something with the results
|
||||
@Override
|
||||
public Sequence<T> apply(Object input)
|
||||
public Sequence<T> apply(Result<BySegmentResultValueClass<T>> input)
|
||||
{
|
||||
Result<Object> result = (Result<Object>) input;
|
||||
final BySegmentResultValueClass<T> value = (BySegmentResultValueClass<T>) result.getValue();
|
||||
final BySegmentResultValueClass<T> value = input.getValue();
|
||||
|
||||
final List<Object> cacheData = Lists.newLinkedList();
|
||||
|
||||
return Sequences.withEffect(
|
||||
Sequences.map(
|
||||
Sequences.map(
|
||||
Sequences.simple(value.getResults()),
|
||||
return Sequences.<T>withEffect(
|
||||
Sequences.<T, T>map(
|
||||
Sequences.<T, T>map(
|
||||
Sequences.<T>simple(value.getResults()),
|
||||
new Function<T, T>()
|
||||
{
|
||||
@Override
|
||||
public T apply(T input)
|
||||
public T apply(final T input)
|
||||
{
|
||||
cacheData.add(cacheFn.apply(input));
|
||||
cacheFutures.add(
|
||||
backgroundExecutorService.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
cacheData.add(cacheFn.apply(input));
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
return input;
|
||||
}
|
||||
}
|
||||
),
|
||||
toolChest.makePreComputeManipulatorFn(
|
||||
rewrittenQuery,
|
||||
// Ick... most makePreComputeManipulatorFn directly cast to their ToolChest query type of choice
|
||||
// This casting is sub-optimal, but hasn't caused any major problems yet...
|
||||
(Query) rewrittenQuery,
|
||||
MetricManipulatorFns.deserializing()
|
||||
)
|
||||
),
|
||||
|
@ -363,12 +397,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
String.format("%s_%s", value.getSegmentId(), value.getInterval())
|
||||
);
|
||||
if (cachePopulator != null) {
|
||||
cachePopulator.populate(cacheData);
|
||||
try {
|
||||
Futures.allAsList(cacheFutures).get();
|
||||
cachePopulator.populate(cacheData);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Error populating cache");
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
);
|
||||
backgroundExecutorService
|
||||
);// End withEffect
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -384,7 +425,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}// End of Supplier
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -393,7 +434,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
QueryToolChest<T, Query<T>> toolChest
|
||||
)
|
||||
{
|
||||
if(sequencesByInterval.isEmpty()) {
|
||||
if (sequencesByInterval.isEmpty()) {
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
||||
|
@ -413,16 +454,16 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
unordered.add(current.rhs);
|
||||
|
||||
while(iterator.hasNext()) {
|
||||
while (iterator.hasNext()) {
|
||||
Pair<Interval, Sequence<T>> next = iterator.next();
|
||||
if(!next.lhs.overlaps(current.lhs)) {
|
||||
if (!next.lhs.overlaps(current.lhs)) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
unordered = Lists.newLinkedList();
|
||||
}
|
||||
unordered.add(next.rhs);
|
||||
current = next;
|
||||
}
|
||||
if(!unordered.isEmpty()) {
|
||||
if (!unordered.isEmpty()) {
|
||||
orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered)));
|
||||
}
|
||||
|
||||
|
|
|
@ -25,10 +25,14 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.query.CacheStrategy;
|
||||
|
@ -38,13 +42,18 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.SegmentDescriptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class CachingQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(CachingQueryRunner.class);
|
||||
private final String segmentIdentifier;
|
||||
private final SegmentDescriptor segmentDescriptor;
|
||||
private final QueryRunner<T> base;
|
||||
|
@ -52,6 +61,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
private final Cache cache;
|
||||
private final ObjectMapper mapper;
|
||||
private final CacheConfig cacheConfig;
|
||||
private final ListeningExecutorService backgroundExecutorService;
|
||||
|
||||
public CachingQueryRunner(
|
||||
String segmentIdentifier,
|
||||
|
@ -60,6 +70,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
Cache cache,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunner<T> base,
|
||||
ExecutorService backgroundExecutorService,
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
|
@ -69,6 +80,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
this.toolChest = toolchest;
|
||||
this.cache = cache;
|
||||
this.mapper = mapper;
|
||||
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
|
||||
this.cacheConfig = cacheConfig;
|
||||
}
|
||||
|
||||
|
@ -137,6 +149,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
}
|
||||
}
|
||||
|
||||
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.<ListenableFuture<?>>newLinkedList());
|
||||
if (populateCache) {
|
||||
final Function cacheFn = strategy.prepareForCache();
|
||||
final List<Object> cacheResults = Lists.newLinkedList();
|
||||
|
@ -147,9 +160,20 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
new Function<T, T>()
|
||||
{
|
||||
@Override
|
||||
public T apply(T input)
|
||||
public T apply(final T input)
|
||||
{
|
||||
cacheResults.add(cacheFn.apply(input));
|
||||
cacheFutures.add(
|
||||
backgroundExecutorService.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
cacheResults.add(cacheFn.apply(input));
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
return input;
|
||||
}
|
||||
}
|
||||
|
@ -159,10 +183,17 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
CacheUtil.populate(cache, mapper, key, cacheResults);
|
||||
try {
|
||||
Futures.allAsList(cacheFutures).get();
|
||||
CacheUtil.populate(cache, mapper, key, cacheResults);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Error while getting future for cache task");
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
backgroundExecutorService
|
||||
);
|
||||
} else {
|
||||
return base.run(query, responseContext);
|
||||
|
|
|
@ -20,8 +20,10 @@
|
|||
package io.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import io.druid.query.Query;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -36,6 +38,10 @@ public class CacheConfig
|
|||
@JsonProperty
|
||||
private boolean populateCache = false;
|
||||
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int numBackgroundThreads = 0;
|
||||
|
||||
@JsonProperty
|
||||
private List<String> unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT);
|
||||
|
||||
|
@ -48,6 +54,9 @@ public class CacheConfig
|
|||
{
|
||||
return useCache;
|
||||
}
|
||||
public int getNumBackgroundThreads(){
|
||||
return numBackgroundThreads;
|
||||
}
|
||||
|
||||
public boolean isQueryCacheable(Query query)
|
||||
{
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package io.druid.guice;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
|
@ -29,8 +31,10 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.common.utils.VMUtils;
|
||||
import io.druid.guice.annotations.BackgroundCaching;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.offheap.OffheapBufferPool;
|
||||
|
@ -40,6 +44,7 @@ import io.druid.query.PrioritizedExecutorService;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -54,6 +59,27 @@ public class DruidProcessingModule implements Module
|
|||
binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@BackgroundCaching
|
||||
@LazySingleton
|
||||
public ExecutorService getBackgroundExecutorService(
|
||||
CacheConfig cacheConfig
|
||||
)
|
||||
{
|
||||
if (cacheConfig.getNumBackgroundThreads() > 0) {
|
||||
return Executors.newFixedThreadPool(
|
||||
cacheConfig.getNumBackgroundThreads(),
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("background-cacher-%d")
|
||||
.setDaemon(true)
|
||||
.setPriority(Thread.MIN_PRIORITY)
|
||||
.build()
|
||||
);
|
||||
} else {
|
||||
return MoreExecutors.sameThreadExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Processing
|
||||
@ManageLifecycle
|
||||
|
|
|
@ -132,8 +132,7 @@ public class QueryResource
|
|||
: objectMapper.writerWithDefaultPrettyPrinter();
|
||||
|
||||
try {
|
||||
requestQuery = ByteStreams.toByteArray(req.getInputStream());
|
||||
query = objectMapper.readValue(requestQuery, Query.class);
|
||||
query = objectMapper.readValue(req.getInputStream(), Query.class);
|
||||
queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = UUID.randomUUID().toString();
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.client.CachingQueryRunner;
|
|||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.collections.CountingMap;
|
||||
import io.druid.guice.annotations.BackgroundCaching;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.BySegmentQueryRunner;
|
||||
|
@ -80,6 +81,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||
private final ServiceEmitter emitter;
|
||||
private final ExecutorService exec;
|
||||
private final ExecutorService cachingExec;
|
||||
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
|
||||
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
|
||||
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
|
||||
|
@ -93,6 +95,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
QueryRunnerFactoryConglomerate conglomerate,
|
||||
ServiceEmitter emitter,
|
||||
@Processing ExecutorService exec,
|
||||
@BackgroundCaching ExecutorService cachingExec,
|
||||
@Smile ObjectMapper objectMapper,
|
||||
Cache cache,
|
||||
CacheConfig cacheConfig
|
||||
|
@ -103,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
this.emitter = emitter;
|
||||
|
||||
this.exec = exec;
|
||||
this.cachingExec = cachingExec;
|
||||
this.cache = cache;
|
||||
this.objectMapper = objectMapper;
|
||||
|
||||
|
@ -423,6 +427,7 @@ public class ServerManager implements QuerySegmentWalker
|
|||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
|
||||
"scan/time"
|
||||
),
|
||||
cachingExec,
|
||||
cacheConfig
|
||||
)
|
||||
)
|
||||
|
|
|
@ -34,6 +34,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
|
@ -1288,7 +1289,7 @@ public class CachingClusteredClientTest
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
HashMap<String,List> context = new HashMap<String, List>();
|
||||
HashMap<String, List> context = new HashMap<String, List>();
|
||||
for (int i = 0; i < numTimesToQuery; ++i) {
|
||||
TestHelper.assertExpectedResults(
|
||||
new MergeIterable<>(
|
||||
|
@ -1900,6 +1901,7 @@ public class CachingClusteredClientTest
|
|||
},
|
||||
cache,
|
||||
jsonMapper,
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new CacheConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.ResourceClosingSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -129,6 +130,7 @@ public class CachingQueryRunnerTest
|
|||
return resultSeq;
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new CacheConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -231,6 +233,7 @@ public class CachingQueryRunnerTest
|
|||
return Sequences.empty();
|
||||
}
|
||||
},
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new CacheConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.JsonConfigurator;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class CacheConfigTest
|
||||
{
|
||||
static Injector injector;
|
||||
static JsonConfigurator configurator;
|
||||
JsonConfigProvider<CacheConfig> configProvider;
|
||||
private static final String propertyPrefix = "io.druid.test.cache";
|
||||
|
||||
@BeforeClass
|
||||
public static void populateStatics(){
|
||||
injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.<com.google.inject.Module>of(new CacheConfigTestModule()));
|
||||
configurator = injector.getBinding(JsonConfigurator.class).getProvider().get();
|
||||
}
|
||||
private static class CacheConfigTestModule implements DruidModule
|
||||
{
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.<Module>of(new SimpleModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder,propertyPrefix,CacheConfig.class);
|
||||
}
|
||||
}
|
||||
private Properties properties = new Properties();
|
||||
|
||||
@Before
|
||||
public void setupTest(){
|
||||
properties.clear();
|
||||
configProvider = JsonConfigProvider.of(propertyPrefix, CacheConfig.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInjection1()
|
||||
{
|
||||
properties.put(propertyPrefix + ".numBackgroundThreads", "5");
|
||||
properties.put(propertyPrefix + ".populateCache", "true");
|
||||
properties.put(propertyPrefix + ".useCache", "true");
|
||||
properties.put(propertyPrefix + ".unCacheable", "[\"a\",\"b\"]");
|
||||
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
|
||||
injector.injectMembers(config);
|
||||
Assert.assertEquals(5, config.getNumBackgroundThreads());
|
||||
Assert.assertEquals(true, config.isPopulateCache());
|
||||
Assert.assertEquals(true, config.isUseCache());
|
||||
}
|
||||
@Test
|
||||
public void testInjection2()
|
||||
{
|
||||
properties.put(propertyPrefix + ".numBackgroundThreads", "99");
|
||||
properties.put(propertyPrefix + ".populateCache", "false");
|
||||
properties.put(propertyPrefix + ".useCache", "false");
|
||||
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
|
||||
Assert.assertEquals(99, config.getNumBackgroundThreads());
|
||||
Assert.assertEquals(false, config.isPopulateCache());
|
||||
Assert.assertEquals(false, config.isUseCache());
|
||||
}
|
||||
|
||||
@Test(expected = com.google.inject.ProvisionException.class)
|
||||
public void testValidationError()
|
||||
{
|
||||
properties.put(propertyPrefix + ".numBackgroundThreads", "-1");
|
||||
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
Assert.assertNotEquals(-1, config.getNumBackgroundThreads());
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = com.google.inject.ProvisionException.class)
|
||||
public void testValidationInsaneError()
|
||||
{
|
||||
properties.put(propertyPrefix + ".numBackgroundThreads", "BABBA YAGA");
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
throw new IllegalStateException("Should have already failed");
|
||||
}
|
||||
|
||||
@Test(expected = com.google.inject.ProvisionException.class)
|
||||
public void testTRUE()
|
||||
{
|
||||
properties.put(propertyPrefix + ".populateCache", "TRUE");
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
throw new IllegalStateException("Should have already failed");
|
||||
}
|
||||
|
||||
@Test(expected = com.google.inject.ProvisionException.class)
|
||||
public void testFALSE()
|
||||
{
|
||||
properties.put(propertyPrefix + ".populateCache", "FALSE");
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
throw new IllegalStateException("Should have already failed");
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = com.google.inject.ProvisionException.class)
|
||||
public void testFaLse()
|
||||
{
|
||||
properties.put(propertyPrefix + ".populateCache", "FaLse");
|
||||
configProvider.inject(properties, configurator);
|
||||
CacheConfig config = configProvider.get().get();
|
||||
throw new IllegalStateException("Should have already failed");
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.MapUtils;
|
||||
import com.metamx.common.Pair;
|
||||
|
@ -141,7 +142,10 @@ public class ServerManagerTest
|
|||
}
|
||||
},
|
||||
new NoopServiceEmitter(),
|
||||
serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get(),
|
||||
serverManagerExec,
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new DefaultObjectMapper(),
|
||||
new LocalCacheProvider().get(),
|
||||
new CacheConfig()
|
||||
);
|
||||
|
||||
|
@ -429,7 +433,7 @@ public class ServerManagerTest
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
Map<String,Object> context = new HashMap<String, Object>();
|
||||
Map<String, Object> context = new HashMap<String, Object>();
|
||||
Sequence<Result<SearchResultValue>> seq = runner.run(query, context);
|
||||
Sequences.toList(seq, Lists.<Result<SearchResultValue>>newArrayList());
|
||||
Iterator<SegmentForTesting> adaptersIter = factory.getAdapters().iterator();
|
||||
|
|
|
@ -90,6 +90,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||
new NoopQueryRunnerFactoryConglomerate(),
|
||||
new NoopServiceEmitter(),
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
new DefaultObjectMapper(),
|
||||
new LocalCacheProvider().get(),
|
||||
new CacheConfig()
|
||||
|
|
Loading…
Reference in New Issue