mirror of https://github.com/apache/druid.git
review comments
This commit is contained in:
parent
6a34c09e05
commit
1c5d2b6dc5
|
@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.metamx.common.guava.Sequence;
|
import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.client.cache.Cache;
|
import io.druid.client.cache.Cache;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
|
import io.druid.client.cache.MapCache;
|
||||||
import io.druid.query.CacheStrategy;
|
import io.druid.query.CacheStrategy;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
|
@ -40,12 +42,15 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||||
private final QueryToolChest toolChest;
|
private final QueryToolChest toolChest;
|
||||||
private final Cache cache;
|
private final Cache cache;
|
||||||
private final ObjectMapper mapper;
|
private final ObjectMapper mapper;
|
||||||
|
private final CacheConfig cacheConfig;
|
||||||
|
|
||||||
public CachePopulatingQueryRunner(
|
public CachePopulatingQueryRunner(
|
||||||
String segmentIdentifier,
|
String segmentIdentifier,
|
||||||
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
|
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
|
||||||
Cache cache, QueryToolChest toolchest,
|
Cache cache,
|
||||||
QueryRunner<T> base
|
QueryToolChest toolchest,
|
||||||
|
QueryRunner<T> base,
|
||||||
|
CacheConfig cacheConfig
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.base = base;
|
this.base = base;
|
||||||
|
@ -54,6 +59,7 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||||
this.toolChest = toolchest;
|
this.toolChest = toolchest;
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.mapper = mapper;
|
this.mapper = mapper;
|
||||||
|
this.cacheConfig = cacheConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -63,7 +69,10 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
|
||||||
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
|
||||||
|
|
||||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
|
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
|
||||||
&& strategy != null && cache.getCacheConfig().isPopulateCache();
|
&& strategy != null
|
||||||
|
&& cacheConfig.isPopulateCache()
|
||||||
|
// historical only populates distributed cache since the cache lookups are done at broker.
|
||||||
|
&& !(cache instanceof MapCache) ;
|
||||||
Sequence<T> results = base.run(query);
|
Sequence<T> results = base.run(query);
|
||||||
if (populateCache) {
|
if (populateCache) {
|
||||||
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
|
||||||
|
|
|
@ -40,6 +40,7 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.client.cache.Cache;
|
import io.druid.client.cache.Cache;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.client.selector.QueryableDruidServer;
|
import io.druid.client.selector.QueryableDruidServer;
|
||||||
import io.druid.client.selector.ServerSelector;
|
import io.druid.client.selector.ServerSelector;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
|
@ -79,19 +80,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
private final TimelineServerView serverView;
|
private final TimelineServerView serverView;
|
||||||
private final Cache cache;
|
private final Cache cache;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
private final CacheConfig cacheConfig;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public CachingClusteredClient(
|
public CachingClusteredClient(
|
||||||
QueryToolChestWarehouse warehouse,
|
QueryToolChestWarehouse warehouse,
|
||||||
TimelineServerView serverView,
|
TimelineServerView serverView,
|
||||||
Cache cache,
|
Cache cache,
|
||||||
@Smile ObjectMapper objectMapper
|
@Smile ObjectMapper objectMapper,
|
||||||
|
CacheConfig cacheConfig
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.warehouse = warehouse;
|
this.warehouse = warehouse;
|
||||||
this.serverView = serverView;
|
this.serverView = serverView;
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
|
this.cacheConfig = cacheConfig;
|
||||||
|
|
||||||
serverView.registerSegmentCallback(
|
serverView.registerSegmentCallback(
|
||||||
Executors.newFixedThreadPool(
|
Executors.newFixedThreadPool(
|
||||||
|
@ -122,9 +126,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||||
|
|
||||||
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true"))
|
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true"))
|
||||||
&& strategy != null
|
&& strategy != null
|
||||||
&& cache.getCacheConfig().isUseCache();
|
&& cacheConfig.isUseCache();
|
||||||
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
|
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
|
||||||
&& strategy != null && cache.getCacheConfig().isPopulateCache();
|
&& strategy != null && cacheConfig.isPopulateCache();
|
||||||
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -39,8 +39,6 @@ public interface Cache
|
||||||
|
|
||||||
public CacheStats getStats();
|
public CacheStats getStats();
|
||||||
|
|
||||||
public CacheConfig getCacheConfig();
|
|
||||||
|
|
||||||
public class NamedKey
|
public class NamedKey
|
||||||
{
|
{
|
||||||
final public String namespace;
|
final public String namespace;
|
||||||
|
|
|
@ -153,10 +153,4 @@ public class MapCache implements Cache
|
||||||
retVal.rewind();
|
retVal.rewind();
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public CacheConfig getCacheConfig()
|
|
||||||
{
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,6 @@ public class MemcachedCache implements Cache
|
||||||
private final AtomicLong timeoutCount = new AtomicLong(0);
|
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||||
private final AtomicLong errorCount = new AtomicLong(0);
|
private final AtomicLong errorCount = new AtomicLong(0);
|
||||||
|
|
||||||
private final CacheConfig config;
|
|
||||||
|
|
||||||
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) {
|
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) {
|
||||||
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
||||||
|
@ -112,7 +111,6 @@ public class MemcachedCache implements Cache
|
||||||
this.expiration = config.getExpiration();
|
this.expiration = config.getExpiration();
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.memcachedPrefix = config.getMemcachedPrefix();
|
this.memcachedPrefix = config.getMemcachedPrefix();
|
||||||
this.config = config;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -129,12 +127,6 @@ public class MemcachedCache implements Cache
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public CacheConfig getCacheConfig()
|
|
||||||
{
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(NamedKey key)
|
public byte[] get(NamedKey key)
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import io.druid.client.CachePopulatingQueryRunner;
|
import io.druid.client.CachePopulatingQueryRunner;
|
||||||
import io.druid.client.cache.Cache;
|
import io.druid.client.cache.Cache;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.collections.CountingMap;
|
import io.druid.collections.CountingMap;
|
||||||
import io.druid.guice.annotations.Processing;
|
import io.druid.guice.annotations.Processing;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
|
@ -83,6 +84,7 @@ public class ServerManager implements QuerySegmentWalker
|
||||||
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
|
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
|
||||||
private final Cache cache;
|
private final Cache cache;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
private final CacheConfig cacheConfig;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ServerManager(
|
public ServerManager(
|
||||||
|
@ -91,7 +93,8 @@ public class ServerManager implements QuerySegmentWalker
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
@Processing ExecutorService exec,
|
@Processing ExecutorService exec,
|
||||||
@Smile ObjectMapper objectMapper,
|
@Smile ObjectMapper objectMapper,
|
||||||
Cache cache
|
Cache cache,
|
||||||
|
CacheConfig cacheConfig
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.segmentLoader = segmentLoader;
|
this.segmentLoader = segmentLoader;
|
||||||
|
@ -103,6 +106,7 @@ public class ServerManager implements QuerySegmentWalker
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
|
|
||||||
this.dataSources = new HashMap<>();
|
this.dataSources = new HashMap<>();
|
||||||
|
this.cacheConfig = cacheConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Long> getDataSourceSizes()
|
public Map<String, Long> getDataSourceSizes()
|
||||||
|
@ -412,7 +416,8 @@ public class ServerManager implements QuerySegmentWalker
|
||||||
objectMapper,
|
objectMapper,
|
||||||
cache,
|
cache,
|
||||||
toolChest,
|
toolChest,
|
||||||
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
|
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
|
||||||
|
cacheConfig
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
).withWaitMeasuredFromNow(),
|
).withWaitMeasuredFromNow(),
|
||||||
|
|
|
@ -1194,7 +1194,8 @@ public class CachingClusteredClientTest
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
cache,
|
cache,
|
||||||
jsonMapper
|
jsonMapper,
|
||||||
|
new CacheConfig()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import com.metamx.common.guava.YieldingAccumulator;
|
||||||
import com.metamx.common.guava.YieldingSequenceBase;
|
import com.metamx.common.guava.YieldingSequenceBase;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.client.cache.LocalCacheProvider;
|
import io.druid.client.cache.LocalCacheProvider;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
@ -138,7 +139,8 @@ public class ServerManagerTest
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get()
|
serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get(),
|
||||||
|
new CacheConfig()
|
||||||
);
|
);
|
||||||
|
|
||||||
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
|
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.client.cache.CacheConfig;
|
||||||
import io.druid.client.cache.LocalCacheProvider;
|
import io.druid.client.cache.LocalCacheProvider;
|
||||||
import io.druid.concurrent.Execs;
|
import io.druid.concurrent.Execs;
|
||||||
import io.druid.curator.CuratorTestBase;
|
import io.druid.curator.CuratorTestBase;
|
||||||
|
@ -83,7 +84,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
MoreExecutors.sameThreadExecutor(),
|
MoreExecutors.sameThreadExecutor(),
|
||||||
new DefaultObjectMapper(),
|
new DefaultObjectMapper(),
|
||||||
new LocalCacheProvider().get()
|
new LocalCacheProvider().get(),
|
||||||
|
new CacheConfig()
|
||||||
);
|
);
|
||||||
|
|
||||||
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
|
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
|
||||||
|
|
Loading…
Reference in New Issue