diff --git a/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java new file mode 100644 index 00000000000..9ee9d2ec66f --- /dev/null +++ b/server/src/main/java/io/druid/client/CachePopulatingQueryRunner.java @@ -0,0 +1,65 @@ +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.client.cache.Cache; +import io.druid.query.CacheStrategy; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChest; +import io.druid.query.SegmentDescriptor; + +import java.util.ArrayList; + +public class CachePopulatingQueryRunner implements QueryRunner +{ + + private final String segmentIdentifier; + private final SegmentDescriptor segmentDescriptor; + private final QueryRunner base; + private final QueryToolChest toolChest; + private final Cache cache; + private final ObjectMapper mapper; + + public CachePopulatingQueryRunner( + String segmentIdentifier, + SegmentDescriptor segmentDescriptor, ObjectMapper mapper, + Cache cache, QueryToolChest toolchest, + QueryRunner base + ) + { + this.base = base; + this.segmentIdentifier = segmentIdentifier; + this.segmentDescriptor = segmentDescriptor; + this.toolChest = toolchest; + this.cache = cache; + this.mapper = mapper; + } + + @Override + public Sequence run(Query query) + { + + final CacheStrategy strategy = toolChest.getCacheStrategy(query); + + final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) + && strategy != null && cache.getCacheConfig().isPopulateCache(); + Sequence results = base.run(query); + if (populateCache) { + Cache.NamedKey key = CacheUtil.computeSegmentCacheKey( + segmentIdentifier, + segmentDescriptor, + strategy.computeCacheKey(query) + ); + CacheUtil.populate( + cache, + mapper, + key, + Sequences.toList(Sequences.map(results, strategy.prepareForCache()), new ArrayList()) + ); + } + return results; + + } +} diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java new file mode 100644 index 00000000000..2021a70227d --- /dev/null +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -0,0 +1,61 @@ +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import io.druid.client.cache.Cache; +import io.druid.query.SegmentDescriptor; +import org.joda.time.Interval; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class CacheUtil +{ + public static Cache.NamedKey computeSegmentCacheKey( + String segmentIdentifier, + SegmentDescriptor descriptor, + byte[] queryCacheKey + ) + { + final Interval segmentQueryInterval = descriptor.getInterval(); + final byte[] versionBytes = descriptor.getVersion().getBytes(); + + return new Cache.NamedKey( + segmentIdentifier, ByteBuffer + .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) + .putLong(segmentQueryInterval.getStartMillis()) + .putLong(segmentQueryInterval.getEndMillis()) + .put(versionBytes) + .putInt(descriptor.getPartitionNumber()) + .put(queryCacheKey).array() + ); + } + + public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable results) + { + try { + List bytes = Lists.newArrayList(); + int size = 0; + for (Object result : results) { + final byte[] array = mapper.writeValueAsBytes(result); + size += array.length; + bytes.add(array); + } + + byte[] valueBytes = new byte[size]; + int offset = 0; + for (byte[] array : bytes) { + System.arraycopy(array, 0, valueBytes, offset, array.length); + offset += array.length; + } + + cache.put(key, valueBytes); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + +} diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index a9d6538b200..82eb182fb5f 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -62,7 +62,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -76,7 +75,6 @@ import java.util.concurrent.Executors; public class CachingClusteredClient implements QueryRunner { private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class); - private final QueryToolChestWarehouse warehouse; private final TimelineServerView serverView; private final Cache cache; @@ -122,9 +120,11 @@ public class CachingClusteredClient implements QueryRunner final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); - final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null; + final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) + && strategy != null + && cache.getCacheConfig().isUseCache(); final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) - && strategy != null; + && strategy != null && cache.getCacheConfig().isPopulateCache(); final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); @@ -134,6 +134,7 @@ public class CachingClusteredClient implements QueryRunner contextBuilder.put("priority", priority); if (populateCache) { + contextBuilder.put("populateCache", "false"); contextBuilder.put("bySegment", "true"); } contextBuilder.put("intermediate", "true"); @@ -180,7 +181,7 @@ public class CachingClusteredClient implements QueryRunner if (queryCacheKey != null) { Map, Cache.NamedKey> cacheKeys = Maps.newHashMap(); for (Pair segment : segments) { - final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey( + final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( segment.lhs.getSegment().getIdentifier(), segment.rhs, queryCacheKey @@ -286,7 +287,8 @@ public class CachingClusteredClient implements QueryRunner objectMapper.getFactory().createParser(cachedResult), cacheObjectClazz ); - } catch (IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } } @@ -372,26 +374,6 @@ public class CachingClusteredClient implements QueryRunner ); } - private Cache.NamedKey computeSegmentCacheKey( - String segmentIdentifier, - SegmentDescriptor descriptor, - byte[] queryCacheKey - ) - { - final Interval segmentQueryInterval = descriptor.getInterval(); - final byte[] versionBytes = descriptor.getVersion().getBytes(); - - return new Cache.NamedKey( - segmentIdentifier, ByteBuffer - .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) - .putLong(segmentQueryInterval.getStartMillis()) - .putLong(segmentQueryInterval.getEndMillis()) - .put(versionBytes) - .putInt(descriptor.getPartitionNumber()) - .put(queryCacheKey).array() - ); - } - private static class CachePopulator { private final Cache cache; @@ -407,26 +389,7 @@ public class CachingClusteredClient implements QueryRunner public void populate(Iterable results) { - try { - List bytes = Lists.newArrayList(); - int size = 0; - for (Object result : results) { - final byte[] array = mapper.writeValueAsBytes(result); - size += array.length; - bytes.add(array); - } - - byte[] valueBytes = new byte[size]; - int offset = 0; - for (byte[] array : bytes) { - System.arraycopy(array, 0, valueBytes, offset, array.length); - offset += array.length; - } - - cache.put(key, valueBytes); - } catch (IOException e) { - throw Throwables.propagate(e); - } + CacheUtil.populate(cache, mapper, key, results); } } } diff --git a/server/src/main/java/io/druid/client/cache/Cache.java b/server/src/main/java/io/druid/client/cache/Cache.java index 6ed875edb0b..f3a12151a6d 100644 --- a/server/src/main/java/io/druid/client/cache/Cache.java +++ b/server/src/main/java/io/druid/client/cache/Cache.java @@ -39,6 +39,8 @@ public interface Cache public CacheStats getStats(); + public CacheConfig getCacheConfig(); + public class NamedKey { final public String namespace; diff --git a/server/src/main/java/io/druid/client/cache/CacheConfig.java b/server/src/main/java/io/druid/client/cache/CacheConfig.java new file mode 100644 index 00000000000..d8c80c7940d --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/CacheConfig.java @@ -0,0 +1,44 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + +public class CacheConfig +{ + @JsonProperty + private boolean useCache = true; + + @JsonProperty + private boolean populateCache = true; + + + public boolean isPopulateCache() + { + return populateCache; + } + + public boolean isUseCache() + { + return useCache; + } +} diff --git a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java index 4f78457b5b9..0bc64fca4d0 100644 --- a/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/LocalCacheProvider.java @@ -25,24 +25,21 @@ import javax.validation.constraints.Min; /** */ -public class LocalCacheProvider implements CacheProvider +public class LocalCacheProvider extends CacheConfig implements CacheProvider { @JsonProperty @Min(0) private long sizeInBytes = 0; - @JsonProperty @Min(0) private int initialSize = 500000; - @JsonProperty @Min(0) private int logEvictionCount = 0; - @Override public Cache get() { - return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes)); + return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes), this); } } diff --git a/server/src/main/java/io/druid/client/cache/MapCache.java b/server/src/main/java/io/druid/client/cache/MapCache.java index dda2929bc89..81d2f8c71df 100644 --- a/server/src/main/java/io/druid/client/cache/MapCache.java +++ b/server/src/main/java/io/druid/client/cache/MapCache.java @@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MapCache implements Cache { - public static Cache create(long sizeInBytes) + public static Cache create(long sizeInBytes, CacheConfig config) { - return new MapCache(new ByteCountingLRUMap(sizeInBytes)); + return new MapCache(new ByteCountingLRUMap(sizeInBytes),config); } private final Map baseMap; @@ -48,12 +48,15 @@ public class MapCache implements Cache private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); + private final CacheConfig config; MapCache( - ByteCountingLRUMap byteCountingLRUMap + ByteCountingLRUMap byteCountingLRUMap, + CacheConfig config ) { this.byteCountingLRUMap = byteCountingLRUMap; + this.config = config; this.baseMap = Collections.synchronizedMap(byteCountingLRUMap); @@ -150,4 +153,10 @@ public class MapCache implements Cache retVal.rewind(); return retVal; } + + @Override + public CacheConfig getCacheConfig() + { + return config; + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index abbfb54139c..c01ace15930 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -72,9 +72,7 @@ public class MemcachedCache implements Cache .build(), AddrUtil.getAddresses(config.getHosts()) ), - config.getMemcachedPrefix(), - config.getTimeout(), - config.getExpiration() + config ); } catch(IOException e) { throw Throwables.propagate(e); @@ -92,15 +90,18 @@ public class MemcachedCache implements Cache private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); - MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) { - Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, + private final CacheConfig config; + + MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) { + Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, "memcachedPrefix length [%d] exceeds maximum length [%d]", - memcachedPrefix.length(), + config.getMemcachedPrefix().length(), MAX_PREFIX_LENGTH); - this.timeout = timeout; - this.expiration = expiration; + this.timeout = config.getTimeout(); + this.expiration = config.getExpiration(); this.client = client; - this.memcachedPrefix = memcachedPrefix; + this.memcachedPrefix = config.getMemcachedPrefix(); + this.config = config; } @Override @@ -117,6 +118,12 @@ public class MemcachedCache implements Cache ); } + @Override + public CacheConfig getCacheConfig() + { + return config; + } + @Override public byte[] get(NamedKey key) { diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2d8674cdd24..1bc15ddc2c0 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; -public class MemcachedCacheConfig +public class MemcachedCacheConfig extends CacheConfig { @JsonProperty private int expiration = 2592000; // What is this number? diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 3ee0c108f8a..d926e9a5d70 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -19,6 +19,7 @@ package io.druid.server.coordination; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Predicates; import com.google.common.collect.Ordering; @@ -28,8 +29,11 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.CachePopulatingQueryRunner; +import io.druid.client.cache.Cache; import io.druid.collections.CountingMap; import io.druid.guice.annotations.Processing; +import io.druid.guice.annotations.Smile; import io.druid.query.BySegmentQueryRunner; import io.druid.query.DataSource; import io.druid.query.FinalizeResultsQueryRunner; @@ -44,7 +48,6 @@ import io.druid.query.QueryToolChest; import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.TableDataSource; -import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; @@ -70,24 +73,25 @@ import java.util.concurrent.ExecutorService; public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); - private final Object lock = new Object(); - private final SegmentLoader segmentLoader; private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; private final ExecutorService exec; - private final Map> dataSources; private final CountingMap dataSourceSizes = new CountingMap(); private final CountingMap dataSourceCounts = new CountingMap(); + private final Cache cache; + private final ObjectMapper objectMapper; @Inject public ServerManager( SegmentLoader segmentLoader, QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, - @Processing ExecutorService exec + @Processing ExecutorService exec, + @Smile ObjectMapper objectMapper, + Cache cache ) { this.segmentLoader = segmentLoader; @@ -95,6 +99,8 @@ public class ServerManager implements QuerySegmentWalker this.emitter = emitter; this.exec = exec; + this.cache = cache; + this.objectMapper = objectMapper; this.dataSources = new HashMap<>(); } @@ -122,7 +128,9 @@ public class ServerManager implements QuerySegmentWalker * Load a single segment. * * @param segment segment to load + * * @return true if the segment was newly loaded, false if it was already loaded + * * @throws SegmentLoadingException if the segment cannot be loaded */ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException @@ -130,10 +138,12 @@ public class ServerManager implements QuerySegmentWalker final Segment adapter; try { adapter = segmentLoader.getSegment(segment); - } catch (SegmentLoadingException e) { + } + catch (SegmentLoadingException e) { try { segmentLoader.cleanup(segment); - } catch (SegmentLoadingException e1) { + } + catch (SegmentLoadingException e1) { // ignore } throw e; @@ -205,11 +215,12 @@ public class ServerManager implements QuerySegmentWalker try { log.info("Attempting to close segment %s", segment.getIdentifier()); oldQueryable.close(); - } catch (IOException e) { + } + catch (IOException e) { log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSource) - .addData("segmentId", segment.getIdentifier()) - .emit(); + .addData("dataSource", dataSource) + .addData("segmentId", segment.getIdentifier()) + .emit(); } } else { log.info( @@ -241,7 +252,8 @@ public class ServerManager implements QuerySegmentWalker String dataSourceName; try { dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } catch (ClassCastException e) { + } + catch (ClassCastException e) { throw new UnsupportedOperationException("Subqueries are only supported in the broker"); } @@ -287,13 +299,13 @@ public class ServerManager implements QuerySegmentWalker factory, toolChest, input.getObject(), - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ) + + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + input.getChunkNumber() ) + ); } } @@ -316,8 +328,8 @@ public class ServerManager implements QuerySegmentWalker final QueryRunnerFactory> factory = conglomerate.findFactory(query); if (factory == null) { log.makeAlert("Unknown query type, [%s]", query.getClass()) - .addData("dataSource", query.getDataSource()) - .emit(); + .addData("dataSource", query.getDataSource()) + .emit(); return new NoopQueryRunner(); } @@ -326,7 +338,8 @@ public class ServerManager implements QuerySegmentWalker String dataSourceName; try { dataSourceName = ((TableDataSource) query.getDataSource()).getName(); - } catch (ClassCastException e) { + } + catch (ClassCastException e) { throw new UnsupportedOperationException("Subqueries are only supported in the broker"); } @@ -360,7 +373,7 @@ public class ServerManager implements QuerySegmentWalker final ReferenceCountingSegment adapter = chunk.getObject(); return Arrays.asList( - buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input)) + buildAndDecorateQueryRunner(factory, toolChest, adapter, input) ); } } @@ -376,9 +389,10 @@ public class ServerManager implements QuerySegmentWalker final QueryRunnerFactory> factory, final QueryToolChest> toolChest, final ReferenceCountingSegment adapter, - final QuerySegmentSpec segmentSpec + final SegmentDescriptor segmentDescriptor ) { + SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); return new SpecificSegmentQueryRunner( new MetricsEmittingQueryRunner( emitter, @@ -393,7 +407,14 @@ public class ServerManager implements QuerySegmentWalker new BySegmentQueryRunner( adapter.getIdentifier(), adapter.getDataInterval().getStart(), - new ReferenceCountingSegmentQueryRunner(factory, adapter) + new CachePopulatingQueryRunner( + adapter.getIdentifier(), + segmentDescriptor, + objectMapper, + cache, + toolChest, + new ReferenceCountingSegmentQueryRunner(factory, adapter) + ) ) ).withWaitMeasuredFromNow(), segmentSpec diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 40045ec8fa7..aeca47ac82a 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -36,6 +36,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.TrinaryFn; import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.RandomServerSelectorStrategy; @@ -195,7 +196,7 @@ public class CachingClusteredClientTest { timeline = new VersionedIntervalTimeline<>(Ordering.natural()); serverView = EasyMock.createStrictMock(TimelineServerView.class); - cache = MapCache.create(100000); + cache = MapCache.create(100000, new CacheConfig()); client = makeClient(); diff --git a/server/src/test/java/io/druid/client/cache/MapCacheTest.java b/server/src/test/java/io/druid/client/cache/MapCacheTest.java index fbff6f9fe2f..67964f6d13c 100644 --- a/server/src/test/java/io/druid/client/cache/MapCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MapCacheTest.java @@ -37,7 +37,7 @@ public class MapCacheTest public void setUp() throws Exception { baseMap = new ByteCountingLRUMap(1024 * 1024); - cache = new MapCache(baseMap); + cache = new MapCache(baseMap, new CacheConfig()); } @Test diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java index 7ef6047d9ac..6867fcc07c4 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java @@ -38,19 +38,23 @@ import java.util.concurrent.TimeUnit; public class MemcachedCacheBenchmark extends SimpleBenchmark { - private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; public static final String NAMESPACE = "default"; - + private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; + private static byte[] randBytes; + @Param({"localhost:11211"}) + String hosts; + // object size in kB + @Param({"1", "5", "10", "40"}) + int objectSize; + @Param({"100", "1000"}) + int objectCount; private MemcachedCache cache; private MemcachedClientIF client; - private static byte[] randBytes; - - @Param({"localhost:11211"}) String hosts; - - // object size in kB - @Param({"1", "5", "10", "40"}) int objectSize; - @Param({"100", "1000"}) int objectCount; + public static void main(String[] args) throws Exception + { + Runner.main(MemcachedCacheBenchmark.class, args); + } @Override protected void setUp() throws Exception @@ -73,11 +77,29 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark AddrUtil.getAddresses(hosts) ); + cache = new MemcachedCache( client, - "druid-memcached-benchmark", - 30000, // 30 seconds - 3600 // 1 hour + new MemcachedCacheConfig() + { + @Override + public String getMemcachedPrefix() + { + return "druid-memcached-benchmark"; + } + + @Override + public int getTimeout() + { + return 30000; + } + + @Override + public int getExpiration() + { + return 3600; + } + } ); randBytes = new byte[objectSize * 1024]; @@ -90,9 +112,10 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark client.shutdown(1, TimeUnit.MINUTES); } - public void timePutObjects(int reps) { - for(int i = 0; i < reps; ++i) { - for(int k = 0; k < objectCount; ++k) { + public void timePutObjects(int reps) + { + for (int i = 0; i < reps; ++i) { + for (int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes); } @@ -101,11 +124,12 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark } } - public long timeGetObject(int reps) { + public long timeGetObject(int reps) + { byte[] bytes = null; long count = 0; for (int i = 0; i < reps; i++) { - for(int k = 0; k < objectCount; ++k) { + for (int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes())); count += bytes.length; @@ -114,24 +138,21 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark return count; } - public long timeBulkGetObjects(int reps) { + public long timeBulkGetObjects(int reps) + { long count = 0; for (int i = 0; i < reps; i++) { List keys = Lists.newArrayList(); - for(int k = 0; k < objectCount; ++k) { + for (int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes())); } Map results = cache.getBulk(keys); - for(Cache.NamedKey key : keys) { + for (Cache.NamedKey key : keys) { byte[] bytes = results.get(key); count += bytes.length; } } return count; } - - public static void main(String[] args) throws Exception { - Runner.main(MemcachedCacheBenchmark.class, args); - } } diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java index 55c85f967f0..d55127288c3 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java @@ -60,7 +60,29 @@ public class MemcachedCacheTest public void setUp() throws Exception { MemcachedClientIF client = new MockMemcachedClient(); - cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600); + cache = new MemcachedCache( + client, new MemcachedCacheConfig() + { + @Override + public String getMemcachedPrefix() + { + return "druid-memcached-test"; + } + + @Override + public int getTimeout() + { + return 500; + } + + @Override + public int getExpiration() + { + return 3600; + } + + } + ); } @Test @@ -396,64 +418,64 @@ class MockMemcachedClient implements MemcachedClientIF public BulkFuture> asyncGetBulk(final Iterator keys, final Transcoder tc) { return new BulkFuture>() - { - @Override - public boolean isTimeout() - { - return false; - } + { + @Override + public boolean isTimeout() + { + return false; + } - @Override - public Map getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException - { - return get(); - } + @Override + public Map getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException + { + return get(); + } - @Override - public OperationStatus getStatus() - { - return null; - } + @Override + public OperationStatus getStatus() + { + return null; + } - @Override - public boolean cancel(boolean b) - { - return false; - } + @Override + public boolean cancel(boolean b) + { + return false; + } - @Override - public boolean isCancelled() - { - return false; - } + @Override + public boolean isCancelled() + { + return false; + } - @Override - public boolean isDone() - { - return true; - } + @Override + public boolean isDone() + { + return true; + } - @Override - public Map get() throws InterruptedException, ExecutionException - { - Map retVal = Maps.newHashMap(); + @Override + public Map get() throws InterruptedException, ExecutionException + { + Map retVal = Maps.newHashMap(); - while(keys.hasNext()) { - String key = keys.next(); - CachedData data = theMap.get(key); - retVal.put(key, data != null ? tc.decode(data) : null); - } + while (keys.hasNext()) { + String key = keys.next(); + CachedData data = theMap.get(key); + retVal.put(key, data != null ? tc.decode(data) : null); + } - return retVal; - } + return retVal; + } - @Override - public Map get(long l, TimeUnit timeUnit) - throws InterruptedException, ExecutionException, TimeoutException - { - return get(); - } - }; + @Override + public Map get(long l, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException + { + return get(); + } + }; } @Override diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 013544dabf1..2e0bb123a44 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -37,7 +37,9 @@ import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingSequenceBase; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.cache.LocalCacheProvider; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.ConcatQueryRunner; import io.druid.query.Druids; import io.druid.query.NoopQueryRunner; @@ -136,7 +138,7 @@ public class ServerManagerTest } }, new NoopServiceEmitter(), - serverManagerExec + serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get() ); loadQueryable("test", "1", new Interval("P1d/2011-04-01")); @@ -592,9 +594,7 @@ public class ServerManagerTest { private final String version; private final Interval interval; - private final Object lock = new Object(); - private volatile boolean closed = false; SegmentForTesting( diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 77b4c9f8144..e12a0f6e805 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; +import io.druid.client.cache.LocalCacheProvider; import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; import io.druid.curator.announcement.Announcer; @@ -52,12 +53,12 @@ import java.util.List; */ public class ZkCoordinatorTest extends CuratorTestBase { + private static final Logger log = new Logger(ZkCoordinatorTest.class); + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private ZkCoordinator zkCoordinator; private ServerManager serverManager; private DataSegmentAnnouncer announcer; private File infoDir; - private final ObjectMapper jsonMapper = new DefaultObjectMapper(); - private static final Logger log = new Logger(ZkCoordinatorTest.class); @Before public void setUp() throws Exception @@ -80,7 +81,9 @@ public class ZkCoordinatorTest extends CuratorTestBase new CacheTestSegmentLoader(), new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), - MoreExecutors.sameThreadExecutor() + MoreExecutors.sameThreadExecutor(), + new DefaultObjectMapper(), + new LocalCacheProvider().get() ); final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0); @@ -100,7 +103,8 @@ public class ZkCoordinatorTest extends CuratorTestBase zkCoordinator = new ZkCoordinator( jsonMapper, - new SegmentLoaderConfig(){ + new SegmentLoaderConfig() + { @Override public File getInfoDir() { @@ -214,7 +218,7 @@ public class ZkCoordinatorTest extends CuratorTestBase List sortedFiles = Lists.newArrayList(files); Collections.sort(sortedFiles); - + Assert.assertEquals(segments.size(), sortedFiles.size()); for (int i = 0; i < sortedFiles.size(); i++) { DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class); diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index c047332b2dc..e736380ac5c 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -24,7 +24,10 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.metamx.common.logger.Logger; import io.airlift.command.Command; +import io.druid.client.cache.Cache; +import io.druid.client.cache.CacheProvider; import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; @@ -73,6 +76,8 @@ public class CliHistorical extends ServerRunnable LifecycleModule.register(binder, ZkCoordinator.class); LifecycleModule.register(binder, Server.class); + binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); + JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class); } } );