diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index 91beca02f25..f60fb6cd3f3 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -14,6 +14,30 @@ There are three JVM parameters that we set on all of our processes: 2. `-Dfile.encoding=UTF-8` This is similar to timezone, we test assuming UTF-8. Local encodings might work, but they also might result in weird and interesting bugs. 3. `-Djava.io.tmpdir=` Various parts of the system that interact with the file system do it via temporary files, and these files can get somewhat large. Many production systems are set up to have small (but fast) `/tmp` directories, which can be problematic with Druid so we recommend pointing the JVM’s tmp directory to something with a little more meat. +## CLI specific + +Some of the CLI options have configurations that only apply in that particular CLI functionality + +### Server Historical + +|Property|Description|Allowed Values|Default| +|--------|-----------|--------------|-------| +|`druid.cache.type`|Set the caching type.|`local`, `memcached`|`local`| +|`druid.historical.cache.useCache`|Allow cache to be used. Cache will NOT be used unless this is set.|`true`,`false`|`false`| +|`druid.historical.cache.populateCache`|Allow cache to be populated. Cache will NOT be populated unless this is set.|`true`,`false`|`false`| +|`druid.historical.cache.unCacheable`|Do not attempt to cache queries whose types are in this array|array of valid values for queryType|`["groupBy","select"]`| +|`druid.historical.cache.numBackgroundThreads`|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |Non-negative integer|`0`| + +### Server Broker + +|Property|Description|Allowed Values|Default| +|--------|-----------|--------------|-------| +|`druid.cache.type`|Set the caching type if caching is enabled.| `local`, `memcached`|`local`| +|`druid.broker.cache.useCache`|Allow cache to be used. Cache will NOT be used unless this is set.|`true`,`false`|`false`| +|`druid.broker.cache.populateCache`|Allow cache to be populated. Cache will NOT be populated unless this is set.|`true`,`false`|`false`| +|`druid.broker.cache.unCacheable`|Do not attempt to cache queries whose types are in this array|array of valid values for queryType|`["groupBy","select"]`| +|`druid.broker.cache.numBackgroundThreads`|Number of background threads in the thread pool to use for eventual-consistency caching results if caching is used. It is recommended to set this value greater or equal to the number of processing threads. To force caching to execute in the same thread as the query (query results are blocked on caching completion), use a thread count of 0. Setups who use a Druid backend in programatic settings (sub-second re-querying) should consider setting this to 0 to prevent eventual consistency from biting overall performance in the ass. If this is you, please experiment to find out what setting works best. |Non-negative integer|`0`| + ## Modules As of Druid v0.6, most core Druid functionality has been compartmentalized into modules. There are a set of default modules that may apply to any node type, and there are specific modules for the different node types. Default modules are __lazily instantiated__. Each module has its own set of configuration. diff --git a/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java b/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java new file mode 100644 index 00000000000..dc9b1cd0e77 --- /dev/null +++ b/processing/src/main/java/io/druid/guice/annotations/BackgroundCaching.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface BackgroundCaching +{ +} diff --git a/processing/src/main/java/io/druid/query/CacheStrategy.java b/processing/src/main/java/io/druid/query/CacheStrategy.java index f77affcb5bf..baf40001005 100644 --- a/processing/src/main/java/io/druid/query/CacheStrategy.java +++ b/processing/src/main/java/io/druid/query/CacheStrategy.java @@ -31,6 +31,7 @@ public interface CacheStrategy> public TypeReference getCacheObjectClazz(); + // Resultant function must be THREAD SAFE public Function prepareForCache(); public Function pullFromCache(); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index a70a04726e4..505c5039ec9 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -30,6 +30,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; @@ -44,6 +47,7 @@ import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; +import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; @@ -64,11 +68,15 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -81,6 +89,7 @@ public class CachingClusteredClient implements QueryRunner private final Cache cache; private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; + private final ListeningExecutorService backgroundExecutorService; @Inject public CachingClusteredClient( @@ -88,6 +97,7 @@ public class CachingClusteredClient implements QueryRunner TimelineServerView serverView, Cache cache, @Smile ObjectMapper objectMapper, + @BackgroundCaching ExecutorService backgroundExecutorService, CacheConfig cacheConfig ) { @@ -96,6 +106,7 @@ public class CachingClusteredClient implements QueryRunner this.cache = cache; this.objectMapper = objectMapper; this.cacheConfig = cacheConfig; + this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); serverView.registerSegmentCallback( Executors.newFixedThreadPool( @@ -135,7 +146,7 @@ public class CachingClusteredClient implements QueryRunner final boolean isBySegment = query.getContextBySegment(false); - ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); + final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); final int priority = query.getContextPriority(0); contextBuilder.put("priority", priority); @@ -146,9 +157,6 @@ public class CachingClusteredClient implements QueryRunner } contextBuilder.put("intermediate", true); - final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); - - VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); @@ -159,7 +167,7 @@ public class CachingClusteredClient implements QueryRunner List> serversLookup = Lists.newLinkedList(); - for (Interval interval : rewrittenQuery.getIntervals()) { + for (Interval interval : query.getIntervals()) { serversLookup.addAll(timeline.lookup(interval)); } @@ -298,9 +306,16 @@ public class CachingClusteredClient implements QueryRunner } } - @SuppressWarnings("unchecked") private void addSequencesFromServer(ArrayList>> listOfSequences) { + listOfSequences.ensureCapacity(listOfSequences.size() + serverSegments.size()); + + final Query>> rewrittenQuery = (Query>>) query + .withOverriddenContext(contextBuilder.build()); + + final Queue> cacheFutures = new ConcurrentLinkedQueue<>(); + // Loop through each server, setting up the query and initiating it. + // The data gets handled as a Future and parsed in the long Sequence chain in the resultSeqToAdd setter. for (Map.Entry> entry : serverSegments.entrySet()) { final DruidServer server = entry.getKey(); final List descriptors = entry.getValue(); @@ -315,42 +330,61 @@ public class CachingClusteredClient implements QueryRunner final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); List intervals = segmentSpec.getIntervals(); - if (!server.isAssignable() || !populateCache || isBySegment) { + if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); - } else { - // this could be more efficient, since we only need to reorder results - // for batches of segments with the same segment start time. + + } else { // Requires some manipulation on broker side + + final QueryRunner>> clientQueryableWithSegments = clientQueryable; + + final Sequence>> runningSequence = clientQueryableWithSegments.run( + rewrittenQuery.withQuerySegmentSpec(segmentSpec), + responseContext + ); resultSeqToAdd = toolChest.mergeSequencesUnordered( - Sequences.map( - clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec), responseContext), - new Function>() + Sequences.>, Sequence>map( + runningSequence, + new Function>, Sequence>() { private final Function cacheFn = strategy.prepareForCache(); + // Acctually do something with the results @Override - public Sequence apply(Object input) + public Sequence apply(Result> input) { - Result result = (Result) input; - final BySegmentResultValueClass value = (BySegmentResultValueClass) result.getValue(); + final BySegmentResultValueClass value = input.getValue(); final List cacheData = Lists.newLinkedList(); - return Sequences.withEffect( - Sequences.map( - Sequences.map( - Sequences.simple(value.getResults()), + return Sequences.withEffect( + Sequences.map( + Sequences.map( + Sequences.simple(value.getResults()), new Function() { @Override - public T apply(T input) + public T apply(final T input) { - cacheData.add(cacheFn.apply(input)); + cacheFutures.add( + backgroundExecutorService.submit( + new Runnable() + { + @Override + public void run() + { + cacheData.add(cacheFn.apply(input)); + } + } + ) + ); return input; } } ), toolChest.makePreComputeManipulatorFn( - rewrittenQuery, + // Ick... most makePreComputeManipulatorFn directly cast to their ToolChest query type of choice + // This casting is sub-optimal, but hasn't caused any major problems yet... + (Query) rewrittenQuery, MetricManipulatorFns.deserializing() ) ), @@ -363,12 +397,19 @@ public class CachingClusteredClient implements QueryRunner String.format("%s_%s", value.getSegmentId(), value.getInterval()) ); if (cachePopulator != null) { - cachePopulator.populate(cacheData); + try { + Futures.allAsList(cacheFutures).get(); + cachePopulator.populate(cacheData); + } + catch (Exception e) { + log.error(e, "Error populating cache"); + throw Throwables.propagate(e); + } } } }, - MoreExecutors.sameThreadExecutor() - ); + backgroundExecutorService + );// End withEffect } } ) @@ -384,7 +425,7 @@ public class CachingClusteredClient implements QueryRunner ); } } - } + }// End of Supplier ); } @@ -393,7 +434,7 @@ public class CachingClusteredClient implements QueryRunner QueryToolChest> toolChest ) { - if(sequencesByInterval.isEmpty()) { + if (sequencesByInterval.isEmpty()) { return Sequences.empty(); } @@ -413,16 +454,16 @@ public class CachingClusteredClient implements QueryRunner unordered.add(current.rhs); - while(iterator.hasNext()) { + while (iterator.hasNext()) { Pair> next = iterator.next(); - if(!next.lhs.overlaps(current.lhs)) { + if (!next.lhs.overlaps(current.lhs)) { orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); unordered = Lists.newLinkedList(); } unordered.add(next.rhs); current = next; } - if(!unordered.isEmpty()) { + if (!unordered.isEmpty()) { orderedSequences.add(toolChest.mergeSequencesUnordered(Sequences.simple(unordered))); } diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index cb0616d98d3..9fdfea2a584 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -25,10 +25,14 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.query.CacheStrategy; @@ -38,13 +42,18 @@ import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; public class CachingQueryRunner implements QueryRunner { - + private static final Logger log = new Logger(CachingQueryRunner.class); private final String segmentIdentifier; private final SegmentDescriptor segmentDescriptor; private final QueryRunner base; @@ -52,6 +61,7 @@ public class CachingQueryRunner implements QueryRunner private final Cache cache; private final ObjectMapper mapper; private final CacheConfig cacheConfig; + private final ListeningExecutorService backgroundExecutorService; public CachingQueryRunner( String segmentIdentifier, @@ -60,6 +70,7 @@ public class CachingQueryRunner implements QueryRunner Cache cache, QueryToolChest toolchest, QueryRunner base, + ExecutorService backgroundExecutorService, CacheConfig cacheConfig ) { @@ -69,6 +80,7 @@ public class CachingQueryRunner implements QueryRunner this.toolChest = toolchest; this.cache = cache; this.mapper = mapper; + this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); this.cacheConfig = cacheConfig; } @@ -137,6 +149,7 @@ public class CachingQueryRunner implements QueryRunner } } + final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { final Function cacheFn = strategy.prepareForCache(); final List cacheResults = Lists.newLinkedList(); @@ -147,9 +160,20 @@ public class CachingQueryRunner implements QueryRunner new Function() { @Override - public T apply(T input) + public T apply(final T input) { - cacheResults.add(cacheFn.apply(input)); + cacheFutures.add( + backgroundExecutorService.submit( + new Runnable() + { + @Override + public void run() + { + cacheResults.add(cacheFn.apply(input)); + } + } + ) + ); return input; } } @@ -159,10 +183,17 @@ public class CachingQueryRunner implements QueryRunner @Override public void run() { - CacheUtil.populate(cache, mapper, key, cacheResults); + try { + Futures.allAsList(cacheFutures).get(); + CacheUtil.populate(cache, mapper, key, cacheResults); + } + catch (Exception e) { + log.error(e, "Error while getting future for cache task"); + throw Throwables.propagate(e); + } } }, - MoreExecutors.sameThreadExecutor() + backgroundExecutorService ); } else { return base.run(query, responseContext); diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java index 2e066e6703d..637d8f36ff0 100644 --- a/server/src/main/java/io/druid/client/cache/CacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -20,8 +20,10 @@ package io.druid.client.cache; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import io.druid.query.Query; +import javax.validation.constraints.Min; import java.util.Arrays; import java.util.List; @@ -36,6 +38,10 @@ public class CacheConfig @JsonProperty private boolean populateCache = false; + @JsonProperty + @Min(0) + private int numBackgroundThreads = 0; + @JsonProperty private List unCacheable = Arrays.asList(Query.GROUP_BY, Query.SELECT); @@ -48,6 +54,9 @@ public class CacheConfig { return useCache; } + public int getNumBackgroundThreads(){ + return numBackgroundThreads; + } public boolean isQueryCacheable(Query query) { diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 714b47220cc..d7e1aa874fc 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -20,6 +20,8 @@ package io.druid.guice; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; @@ -29,8 +31,10 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.cache.CacheConfig; import io.druid.collections.StupidPool; import io.druid.common.utils.VMUtils; +import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.offheap.OffheapBufferPool; @@ -40,6 +44,7 @@ import io.druid.query.PrioritizedExecutorService; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** */ @@ -54,6 +59,27 @@ public class DruidProcessingModule implements Module binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); } + @Provides + @BackgroundCaching + @LazySingleton + public ExecutorService getBackgroundExecutorService( + CacheConfig cacheConfig + ) + { + if (cacheConfig.getNumBackgroundThreads() > 0) { + return Executors.newFixedThreadPool( + cacheConfig.getNumBackgroundThreads(), + new ThreadFactoryBuilder() + .setNameFormat("background-cacher-%d") + .setDaemon(true) + .setPriority(Thread.MIN_PRIORITY) + .build() + ); + } else { + return MoreExecutors.sameThreadExecutor(); + } + } + @Provides @Processing @ManageLifecycle diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 82af76f8d43..a928e8142c3 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -135,8 +135,7 @@ public class QueryResource : objectMapper.writerWithDefaultPrettyPrinter(); try { - requestQuery = ByteStreams.toByteArray(req.getInputStream()); - query = objectMapper.readValue(requestQuery, Query.class); + query = objectMapper.readValue(req.getInputStream(), Query.class); queryId = query.getId(); if (queryId == null) { queryId = UUID.randomUUID().toString(); diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 292cad4b26e..33356aee9a9 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -33,6 +33,7 @@ import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.collections.CountingMap; +import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentQueryRunner; @@ -80,6 +81,7 @@ public class ServerManager implements QuerySegmentWalker private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; private final ExecutorService exec; + private final ExecutorService cachingExec; private final Map> dataSources; private final CountingMap dataSourceSizes = new CountingMap(); private final CountingMap dataSourceCounts = new CountingMap(); @@ -93,6 +95,7 @@ public class ServerManager implements QuerySegmentWalker QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, @Processing ExecutorService exec, + @BackgroundCaching ExecutorService cachingExec, @Smile ObjectMapper objectMapper, Cache cache, CacheConfig cacheConfig @@ -103,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker this.emitter = emitter; this.exec = exec; + this.cachingExec = cachingExec; this.cache = cache; this.objectMapper = objectMapper; @@ -423,6 +427,7 @@ public class ServerManager implements QuerySegmentWalker new ReferenceCountingSegmentQueryRunner(factory, adapter), "scan/time" ), + cachingExec, cacheConfig ) ) diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 4220de989d3..e3b261c6ea4 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; @@ -1288,7 +1289,7 @@ public class CachingClusteredClientTest @Override public void run() { - HashMap context = new HashMap(); + HashMap context = new HashMap(); for (int i = 0; i < numTimesToQuery; ++i) { TestHelper.assertExpectedResults( new MergeIterable<>( @@ -1900,6 +1901,7 @@ public class CachingClusteredClientTest }, cache, jsonMapper, + MoreExecutors.sameThreadExecutor(), new CacheConfig() { @Override diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 6252e9b7bcb..a6e78f1abd4 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; @@ -129,6 +130,7 @@ public class CachingQueryRunnerTest return resultSeq; } }, + MoreExecutors.sameThreadExecutor(), new CacheConfig() { @Override @@ -231,6 +233,7 @@ public class CachingQueryRunnerTest return Sequences.empty(); } }, + MoreExecutors.sameThreadExecutor(), new CacheConfig() { @Override diff --git a/server/src/test/java/io/druid/client/cache/CacheConfigTest.java b/server/src/test/java/io/druid/client/cache/CacheConfigTest.java new file mode 100644 index 00000000000..2e5f2156080 --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/CacheConfigTest.java @@ -0,0 +1,157 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigurator; +import io.druid.initialization.DruidModule; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +/** + * + */ +public class CacheConfigTest +{ + static Injector injector; + static JsonConfigurator configurator; + JsonConfigProvider configProvider; + private static final String propertyPrefix = "io.druid.test.cache"; + + @BeforeClass + public static void populateStatics(){ + injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new CacheConfigTestModule())); + configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + } + private static class CacheConfigTestModule implements DruidModule + { + + @Override + public List getJacksonModules() + { + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder,propertyPrefix,CacheConfig.class); + } + } + private Properties properties = new Properties(); + + @Before + public void setupTest(){ + properties.clear(); + configProvider = JsonConfigProvider.of(propertyPrefix, CacheConfig.class); + } + + @Test + public void testInjection1() + { + properties.put(propertyPrefix + ".numBackgroundThreads", "5"); + properties.put(propertyPrefix + ".populateCache", "true"); + properties.put(propertyPrefix + ".useCache", "true"); + properties.put(propertyPrefix + ".unCacheable", "[\"a\",\"b\"]"); + + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + + injector.injectMembers(config); + Assert.assertEquals(5, config.getNumBackgroundThreads()); + Assert.assertEquals(true, config.isPopulateCache()); + Assert.assertEquals(true, config.isUseCache()); + } + @Test + public void testInjection2() + { + properties.put(propertyPrefix + ".numBackgroundThreads", "99"); + properties.put(propertyPrefix + ".populateCache", "false"); + properties.put(propertyPrefix + ".useCache", "false"); + + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + + Assert.assertEquals(99, config.getNumBackgroundThreads()); + Assert.assertEquals(false, config.isPopulateCache()); + Assert.assertEquals(false, config.isUseCache()); + } + + @Test(expected = com.google.inject.ProvisionException.class) + public void testValidationError() + { + properties.put(propertyPrefix + ".numBackgroundThreads", "-1"); + + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + Assert.assertNotEquals(-1, config.getNumBackgroundThreads()); + } + + + @Test(expected = com.google.inject.ProvisionException.class) + public void testValidationInsaneError() + { + properties.put(propertyPrefix + ".numBackgroundThreads", "BABBA YAGA"); + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + throw new IllegalStateException("Should have already failed"); + } + + @Test(expected = com.google.inject.ProvisionException.class) + public void testTRUE() + { + properties.put(propertyPrefix + ".populateCache", "TRUE"); + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + throw new IllegalStateException("Should have already failed"); + } + + @Test(expected = com.google.inject.ProvisionException.class) + public void testFALSE() + { + properties.put(propertyPrefix + ".populateCache", "FALSE"); + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + throw new IllegalStateException("Should have already failed"); + } + + + @Test(expected = com.google.inject.ProvisionException.class) + public void testFaLse() + { + properties.put(propertyPrefix + ".populateCache", "FaLse"); + configProvider.inject(properties, configurator); + CacheConfig config = configProvider.get().get(); + throw new IllegalStateException("Should have already failed"); + } + + +} diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 12fdeabe7da..1f440c5f15e 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.common.Pair; @@ -141,7 +142,10 @@ public class ServerManagerTest } }, new NoopServiceEmitter(), - serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get(), + serverManagerExec, + MoreExecutors.sameThreadExecutor(), + new DefaultObjectMapper(), + new LocalCacheProvider().get(), new CacheConfig() ); @@ -429,7 +433,7 @@ public class ServerManagerTest @Override public void run() { - Map context = new HashMap(); + Map context = new HashMap(); Sequence> seq = runner.run(query, context); Sequences.toList(seq, Lists.>newArrayList()); Iterator adaptersIter = factory.getAdapters().iterator(); diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 666c72c0f5c..e6216082e50 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -90,6 +90,7 @@ public class ZkCoordinatorTest extends CuratorTestBase new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), + MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), new LocalCacheProvider().get(), new CacheConfig()