Memcache changes

This commit is contained in:
nishantmonu51 2014-03-17 18:54:30 +05:30
parent 1f62257ac2
commit 468e8e0b31
17 changed files with 394 additions and 172 deletions

View File

@ -0,0 +1,65 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.client.cache.Cache;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import java.util.ArrayList;
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
{
private final String segmentIdentifier;
private final SegmentDescriptor segmentDescriptor;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
private final Cache cache;
private final ObjectMapper mapper;
public CachePopulatingQueryRunner(
String segmentIdentifier,
SegmentDescriptor segmentDescriptor, ObjectMapper mapper,
Cache cache, QueryToolChest toolchest,
QueryRunner<T> base
)
{
this.base = base;
this.segmentIdentifier = segmentIdentifier;
this.segmentDescriptor = segmentDescriptor;
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
}
@Override
public Sequence<T> run(Query<T> query)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null && cache.getCacheConfig().isPopulateCache();
Sequence<T> results = base.run(query);
if (populateCache) {
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
CacheUtil.populate(
cache,
mapper,
key,
Sequences.toList(Sequences.map(results, strategy.prepareForCache()), new ArrayList())
);
}
return results;
}
}

View File

@ -0,0 +1,61 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class CacheUtil
{
public static Cache.NamedKey computeSegmentCacheKey(
String segmentIdentifier,
SegmentDescriptor descriptor,
byte[] queryCacheKey
)
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
return new Cache.NamedKey(
segmentIdentifier, ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis())
.putLong(segmentQueryInterval.getEndMillis())
.put(versionBytes)
.putInt(descriptor.getPartitionNumber())
.put(queryCacheKey).array()
);
}
public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key, Iterable<Object> results)
{
try {
List<byte[]> bytes = Lists.newArrayList();
int size = 0;
for (Object result : results) {
final byte[] array = mapper.writeValueAsBytes(result);
size += array.length;
bytes.add(array);
}
byte[] valueBytes = new byte[size];
int offset = 0;
for (byte[] array : bytes) {
System.arraycopy(array, 0, valueBytes, offset, array.length);
offset += array.length;
}
cache.put(key, valueBytes);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -62,7 +62,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -76,7 +75,6 @@ import java.util.concurrent.Executors;
public class CachingClusteredClient<T> implements QueryRunner<T>
{
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
private final QueryToolChestWarehouse warehouse;
private final TimelineServerView serverView;
private final Cache cache;
@ -122,9 +120,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true"))
&& strategy != null
&& cache.getCacheConfig().isUseCache();
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
&& strategy != null && cache.getCacheConfig().isPopulateCache();
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
@ -134,6 +134,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
contextBuilder.put("priority", priority);
if (populateCache) {
contextBuilder.put("populateCache", "false");
contextBuilder.put("bySegment", "true");
}
contextBuilder.put("intermediate", "true");
@ -180,7 +181,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
if (queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final Cache.NamedKey segmentCacheKey = computeSegmentCacheKey(
final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(
segment.lhs.getSegment().getIdentifier(),
segment.rhs,
queryCacheKey
@ -286,7 +287,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
objectMapper.getFactory().createParser(cachedResult),
cacheObjectClazz
);
} catch (IOException e) {
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@ -372,26 +374,6 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
private Cache.NamedKey computeSegmentCacheKey(
String segmentIdentifier,
SegmentDescriptor descriptor,
byte[] queryCacheKey
)
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();
return new Cache.NamedKey(
segmentIdentifier, ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis())
.putLong(segmentQueryInterval.getEndMillis())
.put(versionBytes)
.putInt(descriptor.getPartitionNumber())
.put(queryCacheKey).array()
);
}
private static class CachePopulator
{
private final Cache cache;
@ -407,26 +389,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
public void populate(Iterable<Object> results)
{
try {
List<byte[]> bytes = Lists.newArrayList();
int size = 0;
for (Object result : results) {
final byte[] array = mapper.writeValueAsBytes(result);
size += array.length;
bytes.add(array);
}
byte[] valueBytes = new byte[size];
int offset = 0;
for (byte[] array : bytes) {
System.arraycopy(array, 0, valueBytes, offset, array.length);
offset += array.length;
}
cache.put(key, valueBytes);
} catch (IOException e) {
throw Throwables.propagate(e);
}
CacheUtil.populate(cache, mapper, key, results);
}
}
}

View File

@ -39,6 +39,8 @@ public interface Cache
public CacheStats getStats();
public CacheConfig getCacheConfig();
public class NamedKey
{
final public String namespace;

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class CacheConfig
{
@JsonProperty
private boolean useCache = true;
@JsonProperty
private boolean populateCache = true;
public boolean isPopulateCache()
{
return populateCache;
}
public boolean isUseCache()
{
return useCache;
}
}

View File

@ -25,24 +25,21 @@ import javax.validation.constraints.Min;
/**
*/
public class LocalCacheProvider implements CacheProvider
public class LocalCacheProvider extends CacheConfig implements CacheProvider
{
@JsonProperty
@Min(0)
private long sizeInBytes = 0;
@JsonProperty
@Min(0)
private int initialSize = 500000;
@JsonProperty
@Min(0)
private int logEvictionCount = 0;
@Override
public Cache get()
{
return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes));
return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes), this);
}
}

View File

@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class MapCache implements Cache
{
public static Cache create(long sizeInBytes)
public static Cache create(long sizeInBytes, CacheConfig config)
{
return new MapCache(new ByteCountingLRUMap(sizeInBytes));
return new MapCache(new ByteCountingLRUMap(sizeInBytes),config);
}
private final Map<ByteBuffer, byte[]> baseMap;
@ -48,12 +48,15 @@ public class MapCache implements Cache
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final CacheConfig config;
MapCache(
ByteCountingLRUMap byteCountingLRUMap
ByteCountingLRUMap byteCountingLRUMap,
CacheConfig config
)
{
this.byteCountingLRUMap = byteCountingLRUMap;
this.config = config;
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
@ -150,4 +153,10 @@ public class MapCache implements Cache
retVal.rewind();
return retVal;
}
@Override
public CacheConfig getCacheConfig()
{
return config;
}
}

View File

@ -72,9 +72,7 @@ public class MemcachedCache implements Cache
.build(),
AddrUtil.getAddresses(config.getHosts())
),
config.getMemcachedPrefix(),
config.getTimeout(),
config.getExpiration()
config
);
} catch(IOException e) {
throw Throwables.propagate(e);
@ -92,15 +90,18 @@ public class MemcachedCache implements Cache
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
private final CacheConfig config;
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) {
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
"memcachedPrefix length [%d] exceeds maximum length [%d]",
memcachedPrefix.length(),
config.getMemcachedPrefix().length(),
MAX_PREFIX_LENGTH);
this.timeout = timeout;
this.expiration = expiration;
this.timeout = config.getTimeout();
this.expiration = config.getExpiration();
this.client = client;
this.memcachedPrefix = memcachedPrefix;
this.memcachedPrefix = config.getMemcachedPrefix();
this.config = config;
}
@Override
@ -117,6 +118,12 @@ public class MemcachedCache implements Cache
);
}
@Override
public CacheConfig getCacheConfig()
{
return config;
}
@Override
public byte[] get(NamedKey key)
{

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class MemcachedCacheConfig
public class MemcachedCacheConfig extends CacheConfig
{
@JsonProperty
private int expiration = 2592000; // What is this number?

View File

@ -19,6 +19,7 @@
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Ordering;
@ -28,8 +29,11 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachePopulatingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.collections.CountingMap;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
@ -44,7 +48,6 @@ import io.druid.query.QueryToolChest;
import io.druid.query.ReferenceCountingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ReferenceCountingSegment;
@ -70,24 +73,25 @@ import java.util.concurrent.ExecutorService;
public class ServerManager implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
private final Object lock = new Object();
private final SegmentLoader segmentLoader;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
private final Cache cache;
private final ObjectMapper objectMapper;
@Inject
public ServerManager(
SegmentLoader segmentLoader,
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec
@Processing ExecutorService exec,
@Smile ObjectMapper objectMapper,
Cache cache
)
{
this.segmentLoader = segmentLoader;
@ -95,6 +99,8 @@ public class ServerManager implements QuerySegmentWalker
this.emitter = emitter;
this.exec = exec;
this.cache = cache;
this.objectMapper = objectMapper;
this.dataSources = new HashMap<>();
}
@ -122,7 +128,9 @@ public class ServerManager implements QuerySegmentWalker
* Load a single segment.
*
* @param segment segment to load
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
@ -130,10 +138,12 @@ public class ServerManager implements QuerySegmentWalker
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
} catch (SegmentLoadingException e) {
}
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
} catch (SegmentLoadingException e1) {
}
catch (SegmentLoadingException e1) {
// ignore
}
throw e;
@ -205,11 +215,12 @@ public class ServerManager implements QuerySegmentWalker
try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
} catch (IOException e) {
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
@ -241,7 +252,8 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
@ -287,13 +299,13 @@ public class ServerManager implements QuerySegmentWalker
factory,
toolChest,
input.getObject(),
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
);
}
}
@ -316,8 +328,8 @@ public class ServerManager implements QuerySegmentWalker
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
log.makeAlert("Unknown query type, [%s]", query.getClass())
.addData("dataSource", query.getDataSource())
.emit();
.addData("dataSource", query.getDataSource())
.emit();
return new NoopQueryRunner<T>();
}
@ -326,7 +338,8 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName;
try {
dataSourceName = ((TableDataSource) query.getDataSource()).getName();
} catch (ClassCastException e) {
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries are only supported in the broker");
}
@ -360,7 +373,7 @@ public class ServerManager implements QuerySegmentWalker
final ReferenceCountingSegment adapter = chunk.getObject();
return Arrays.asList(
buildAndDecorateQueryRunner(factory, toolChest, adapter, new SpecificSegmentSpec(input))
buildAndDecorateQueryRunner(factory, toolChest, adapter, input)
);
}
}
@ -376,9 +389,10 @@ public class ServerManager implements QuerySegmentWalker
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
final ReferenceCountingSegment adapter,
final QuerySegmentSpec segmentSpec
final SegmentDescriptor segmentDescriptor
)
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
@ -393,7 +407,14 @@ public class ServerManager implements QuerySegmentWalker
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
new CachePopulatingQueryRunner<T>(
adapter.getIdentifier(),
segmentDescriptor,
objectMapper,
cache,
toolChest,
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter)
)
)
).withWaitMeasuredFromNow(),
segmentSpec

View File

@ -36,6 +36,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.TrinaryFn;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.RandomServerSelectorStrategy;
@ -195,7 +196,7 @@ public class CachingClusteredClientTest
{
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
serverView = EasyMock.createStrictMock(TimelineServerView.class);
cache = MapCache.create(100000);
cache = MapCache.create(100000, new CacheConfig());
client = makeClient();

View File

@ -37,7 +37,7 @@ public class MapCacheTest
public void setUp() throws Exception
{
baseMap = new ByteCountingLRUMap(1024 * 1024);
cache = new MapCache(baseMap);
cache = new MapCache(baseMap, new CacheConfig());
}
@Test

View File

@ -38,19 +38,23 @@ import java.util.concurrent.TimeUnit;
public class MemcachedCacheBenchmark extends SimpleBenchmark
{
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
public static final String NAMESPACE = "default";
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
private static byte[] randBytes;
@Param({"localhost:11211"})
String hosts;
// object size in kB
@Param({"1", "5", "10", "40"})
int objectSize;
@Param({"100", "1000"})
int objectCount;
private MemcachedCache cache;
private MemcachedClientIF client;
private static byte[] randBytes;
@Param({"localhost:11211"}) String hosts;
// object size in kB
@Param({"1", "5", "10", "40"}) int objectSize;
@Param({"100", "1000"}) int objectCount;
public static void main(String[] args) throws Exception
{
Runner.main(MemcachedCacheBenchmark.class, args);
}
@Override
protected void setUp() throws Exception
@ -73,11 +77,29 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
AddrUtil.getAddresses(hosts)
);
cache = new MemcachedCache(
client,
"druid-memcached-benchmark",
30000, // 30 seconds
3600 // 1 hour
new MemcachedCacheConfig()
{
@Override
public String getMemcachedPrefix()
{
return "druid-memcached-benchmark";
}
@Override
public int getTimeout()
{
return 30000;
}
@Override
public int getExpiration()
{
return 3600;
}
}
);
randBytes = new byte[objectSize * 1024];
@ -90,9 +112,10 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
client.shutdown(1, TimeUnit.MINUTES);
}
public void timePutObjects(int reps) {
for(int i = 0; i < reps; ++i) {
for(int k = 0; k < objectCount; ++k) {
public void timePutObjects(int reps)
{
for (int i = 0; i < reps; ++i) {
for (int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k;
cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes);
}
@ -101,11 +124,12 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
}
}
public long timeGetObject(int reps) {
public long timeGetObject(int reps)
{
byte[] bytes = null;
long count = 0;
for (int i = 0; i < reps; i++) {
for(int k = 0; k < objectCount; ++k) {
for (int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k;
bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes()));
count += bytes.length;
@ -114,24 +138,21 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
return count;
}
public long timeBulkGetObjects(int reps) {
public long timeBulkGetObjects(int reps)
{
long count = 0;
for (int i = 0; i < reps; i++) {
List<Cache.NamedKey> keys = Lists.newArrayList();
for(int k = 0; k < objectCount; ++k) {
for (int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k;
keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes()));
}
Map<Cache.NamedKey, byte[]> results = cache.getBulk(keys);
for(Cache.NamedKey key : keys) {
for (Cache.NamedKey key : keys) {
byte[] bytes = results.get(key);
count += bytes.length;
}
}
return count;
}
public static void main(String[] args) throws Exception {
Runner.main(MemcachedCacheBenchmark.class, args);
}
}

View File

@ -60,7 +60,29 @@ public class MemcachedCacheTest
public void setUp() throws Exception
{
MemcachedClientIF client = new MockMemcachedClient();
cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600);
cache = new MemcachedCache(
client, new MemcachedCacheConfig()
{
@Override
public String getMemcachedPrefix()
{
return "druid-memcached-test";
}
@Override
public int getTimeout()
{
return 500;
}
@Override
public int getExpiration()
{
return 3600;
}
}
);
}
@Test
@ -396,64 +418,64 @@ class MockMemcachedClient implements MemcachedClientIF
public <T> BulkFuture<Map<String, T>> asyncGetBulk(final Iterator<String> keys, final Transcoder<T> tc)
{
return new BulkFuture<Map<String, T>>()
{
@Override
public boolean isTimeout()
{
return false;
}
{
@Override
public boolean isTimeout()
{
return false;
}
@Override
public Map<String, T> getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
{
return get();
}
@Override
public Map<String, T> getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
{
return get();
}
@Override
public OperationStatus getStatus()
{
return null;
}
@Override
public OperationStatus getStatus()
{
return null;
}
@Override
public boolean cancel(boolean b)
{
return false;
}
@Override
public boolean cancel(boolean b)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public Map<String, T> get() throws InterruptedException, ExecutionException
{
Map<String, T> retVal = Maps.newHashMap();
@Override
public Map<String, T> get() throws InterruptedException, ExecutionException
{
Map<String, T> retVal = Maps.newHashMap();
while(keys.hasNext()) {
String key = keys.next();
CachedData data = theMap.get(key);
retVal.put(key, data != null ? tc.decode(data) : null);
}
while (keys.hasNext()) {
String key = keys.next();
CachedData data = theMap.get(key);
retVal.put(key, data != null ? tc.decode(data) : null);
}
return retVal;
}
return retVal;
}
@Override
public Map<String, T> get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException
{
return get();
}
};
@Override
public Map<String, T> get(long l, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException
{
return get();
}
};
}
@Override

View File

@ -37,7 +37,9 @@ import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.Druids;
import io.druid.query.NoopQueryRunner;
@ -136,7 +138,7 @@ public class ServerManagerTest
}
},
new NoopServiceEmitter(),
serverManagerExec
serverManagerExec, new DefaultObjectMapper(), new LocalCacheProvider().get()
);
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
@ -592,9 +594,7 @@ public class ServerManagerTest
{
private final String version;
private final Interval interval;
private final Object lock = new Object();
private volatile boolean closed = false;
SegmentForTesting(

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.logger.Logger;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
@ -52,12 +53,12 @@ import java.util.List;
*/
public class ZkCoordinatorTest extends CuratorTestBase
{
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private ZkCoordinator zkCoordinator;
private ServerManager serverManager;
private DataSegmentAnnouncer announcer;
private File infoDir;
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final Logger log = new Logger(ZkCoordinatorTest.class);
@Before
public void setUp() throws Exception
@ -80,7 +81,9 @@ public class ZkCoordinatorTest extends CuratorTestBase
new CacheTestSegmentLoader(),
new NoopQueryRunnerFactoryConglomerate(),
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor()
MoreExecutors.sameThreadExecutor(),
new DefaultObjectMapper(),
new LocalCacheProvider().get()
);
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
@ -100,7 +103,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
zkCoordinator = new ZkCoordinator(
jsonMapper,
new SegmentLoaderConfig(){
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
@ -214,7 +218,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
List<File> sortedFiles = Lists.newArrayList(files);
Collections.sort(sortedFiles);
Assert.assertEquals(segments.size(), sortedFiles.size());
for (int i = 0; i < sortedFiles.size(); i++) {
DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class);

View File

@ -24,7 +24,10 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
@ -73,6 +76,8 @@ public class CliHistorical extends ServerRunnable
LifecycleModule.register(binder, ZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class);
}
}
);