From 1c5d2b6dc5e9f708922a2accd24e3e6b3f6edc5e Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 25 Mar 2014 13:27:30 +0530 Subject: [PATCH] review comments --- .../druid/client/CachePopulatingQueryRunner.java | 15 ++++++++++++--- .../io/druid/client/CachingClusteredClient.java | 10 +++++++--- .../main/java/io/druid/client/cache/Cache.java | 2 -- .../main/java/io/druid/client/cache/MapCache.java | 6 ------ .../io/druid/client/cache/MemcachedCache.java | 8 -------- .../druid/server/coordination/ServerManager.java | 9 +++++++-- .../druid/client/CachingClusteredClientTest.java | 3 ++- .../server/coordination/ServerManagerTest.java | 4 +++- .../server/coordination/ZkCoordinatorTest.java | 4 +++- 9 files changed, 34 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java index d834f2c8696..2a91ee2237b 100644 --- a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -40,12 +42,15 @@ public class CachePopulatingQueryRunner implements QueryRunner private final QueryToolChest toolChest; private final Cache cache; private final ObjectMapper mapper; + private final CacheConfig cacheConfig; public CachePopulatingQueryRunner( String segmentIdentifier, SegmentDescriptor segmentDescriptor, ObjectMapper mapper, - Cache cache, QueryToolChest toolchest, - QueryRunner base + Cache cache, + QueryToolChest toolchest, + QueryRunner base, + CacheConfig cacheConfig ) { this.base = base; @@ -54,6 +59,7 @@ public class CachePopulatingQueryRunner implements QueryRunner this.toolChest = toolchest; this.cache = cache; this.mapper = mapper; + this.cacheConfig = cacheConfig; } @Override @@ -63,7 +69,10 @@ public class CachePopulatingQueryRunner implements QueryRunner final CacheStrategy strategy = toolChest.getCacheStrategy(query); final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) - && strategy != null && cache.getCacheConfig().isPopulateCache(); + && strategy != null + && cacheConfig.isPopulateCache() + // historical only populates distributed cache since the cache lookups are done at broker. + && !(cache instanceof MapCache) ; Sequence results = base.run(query); if (populateCache) { Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 82eb182fb5f..c5bbbedd774 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -40,6 +40,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.EmittingLogger; import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; import io.druid.guice.annotations.Smile; @@ -79,19 +80,22 @@ public class CachingClusteredClient implements QueryRunner private final TimelineServerView serverView; private final Cache cache; private final ObjectMapper objectMapper; + private final CacheConfig cacheConfig; @Inject public CachingClusteredClient( QueryToolChestWarehouse warehouse, TimelineServerView serverView, Cache cache, - @Smile ObjectMapper objectMapper + @Smile ObjectMapper objectMapper, + CacheConfig cacheConfig ) { this.warehouse = warehouse; this.serverView = serverView; this.cache = cache; this.objectMapper = objectMapper; + this.cacheConfig = cacheConfig; serverView.registerSegmentCallback( Executors.newFixedThreadPool( @@ -122,9 +126,9 @@ public class CachingClusteredClient implements QueryRunner final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null - && cache.getCacheConfig().isUseCache(); + && cacheConfig.isUseCache(); final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) - && strategy != null && cache.getCacheConfig().isPopulateCache(); + && strategy != null && cacheConfig.isPopulateCache(); final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); diff --git a/server/src/main/java/io/druid/client/cache/Cache.java b/server/src/main/java/io/druid/client/cache/Cache.java index f3a12151a6d..6ed875edb0b 100644 --- a/server/src/main/java/io/druid/client/cache/Cache.java +++ b/server/src/main/java/io/druid/client/cache/Cache.java @@ -39,8 +39,6 @@ public interface Cache public CacheStats getStats(); - public CacheConfig getCacheConfig(); - public class NamedKey { final public String namespace; diff --git a/server/src/main/java/io/druid/client/cache/MapCache.java b/server/src/main/java/io/druid/client/cache/MapCache.java index 81d2f8c71df..6c37465d4db 100644 --- a/server/src/main/java/io/druid/client/cache/MapCache.java +++ b/server/src/main/java/io/druid/client/cache/MapCache.java @@ -153,10 +153,4 @@ public class MapCache implements Cache retVal.rewind(); return retVal; } - - @Override - public CacheConfig getCacheConfig() - { - return config; - } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index dfe84b3bc07..2dc3c031775 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -101,7 +101,6 @@ public class MemcachedCache implements Cache private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); - private final CacheConfig config; MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) { Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, @@ -112,7 +111,6 @@ public class MemcachedCache implements Cache this.expiration = config.getExpiration(); this.client = client; this.memcachedPrefix = config.getMemcachedPrefix(); - this.config = config; } @Override @@ -129,12 +127,6 @@ public class MemcachedCache implements Cache ); } - @Override - public CacheConfig getCacheConfig() - { - return config; - } - @Override public byte[] get(NamedKey key) { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index bdd6042c194..b49fade0404 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachePopulatingQueryRunner; import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.collections.CountingMap; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; @@ -83,6 +84,7 @@ public class ServerManager implements QuerySegmentWalker private final CountingMap dataSourceCounts = new CountingMap(); private final Cache cache; private final ObjectMapper objectMapper; + private final CacheConfig cacheConfig; @Inject public ServerManager( @@ -91,7 +93,8 @@ public class ServerManager implements QuerySegmentWalker ServiceEmitter emitter, @Processing ExecutorService exec, @Smile ObjectMapper objectMapper, - Cache cache + Cache cache, + CacheConfig cacheConfig ) { this.segmentLoader = segmentLoader; @@ -103,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker this.objectMapper = objectMapper; this.dataSources = new HashMap<>(); + this.cacheConfig = cacheConfig; } public Map getDataSourceSizes() @@ -412,7 +416,8 @@ public class ServerManager implements QuerySegmentWalker objectMapper, cache, toolChest, - new ReferenceCountingSegmentQueryRunner(factory, adapter) + new ReferenceCountingSegmentQueryRunner(factory, adapter), + cacheConfig ) ) ).withWaitMeasuredFromNow(), diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index aeca47ac82a..c3aa46a3226 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -1194,7 +1194,8 @@ public class CachingClusteredClientTest } }, cache, - jsonMapper + jsonMapper, + new CacheConfig() ); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 2e0bb123a44..223c720c7d8 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -37,6 +37,7 @@ import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.cache.CacheConfig; import io.druid.client.cache.LocalCacheProvider; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; @@ -138,7 +139,8 @@ public class ServerManagerTest } }, new NoopServiceEmitter(), - serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get() + serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get(), + new CacheConfig() ); loadQueryable("test", "1", new Interval("P1d/2011-04-01")); diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index e12a0f6e805..d0a7b62f4be 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; +import io.druid.client.cache.CacheConfig; import io.druid.client.cache.LocalCacheProvider; import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; @@ -83,7 +84,8 @@ public class ZkCoordinatorTest extends CuratorTestBase new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), - new LocalCacheProvider().get() + new LocalCacheProvider().get(), + new CacheConfig() ); final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);