rename cacheBroker -> cache

This commit is contained in:
xvrl 2013-01-18 15:22:56 -08:00
parent 86ca8967ca
commit e0c34c3b97
11 changed files with 81 additions and 85 deletions

View File

@ -41,7 +41,7 @@ import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.cache.CacheBroker; import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.CacheStrategy;
@ -57,7 +57,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -76,19 +75,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private final QueryToolChestWarehouse warehouse; private final QueryToolChestWarehouse warehouse;
private final ServerView serverView; private final ServerView serverView;
private final CacheBroker cacheBroker; private final Cache cache;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public CachingClusteredClient( public CachingClusteredClient(
QueryToolChestWarehouse warehouse, QueryToolChestWarehouse warehouse,
ServerView serverView, ServerView serverView,
CacheBroker cacheBroker, Cache cache,
ObjectMapper objectMapper ObjectMapper objectMapper
) )
{ {
this.warehouse = warehouse; this.warehouse = warehouse;
this.serverView = serverView; this.serverView = serverView;
this.cacheBroker = cacheBroker; this.cache = cache;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
@ -100,7 +99,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
@Override @Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
{ {
CachingClusteredClient.this.cacheBroker.close(segment.getIdentifier()); CachingClusteredClient.this.cache.close(segment.getIdentifier());
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }
} }
@ -161,16 +160,16 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// Pull cached segments from cache and remove from set of segments to query // Pull cached segments from cache and remove from set of segments to query
if(useCache && queryCacheKey != null) { if(useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> cacheKeys = Maps.newHashMap(); Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for(Pair<ServerSelector, SegmentDescriptor> e : segments) { for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
} }
Map<CacheBroker.NamedKey, byte[]> cachedValues = cacheBroker.getBulk(cacheKeys.values()); Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> entry : cacheKeys.entrySet()) { for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey(); Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
CacheBroker.NamedKey segmentCacheKey = entry.getValue(); Cache.NamedKey segmentCacheKey = entry.getValue();
final ServerSelector selector = segment.lhs; final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs; final SegmentDescriptor descriptor = segment.rhs;
@ -188,7 +187,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final String segmentIdentifier = selector.getSegment().getIdentifier(); final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put( cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval), String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
new CachePopulator(cacheBroker, objectMapper, segmentCacheKey) new CachePopulator(cache, objectMapper, segmentCacheKey)
); );
} }
} }
@ -341,12 +340,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
); );
} }
private CacheBroker.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
{ {
final Interval segmentQueryInterval = descriptor.getInterval(); final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes(); final byte[] versionBytes = descriptor.getVersion().getBytes();
return new CacheBroker.NamedKey( return new Cache.NamedKey(
segmentIdentifier, ByteBuffer segmentIdentifier, ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length) .allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis()) .putLong(segmentQueryInterval.getStartMillis())
@ -359,11 +358,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private static class CachePopulator private static class CachePopulator
{ {
private final CacheBroker cache; private final Cache cache;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final CacheBroker.NamedKey key; private final Cache.NamedKey key;
public CachePopulator(CacheBroker cache, ObjectMapper mapper, CacheBroker.NamedKey key) public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key)
{ {
this.cache = cache; this.cache = cache;
this.mapper = mapper; this.mapper = mapper;

View File

@ -26,7 +26,7 @@ import java.util.Map;
/** /**
*/ */
public interface CacheBroker public interface Cache
{ {
public byte[] get(NamedKey key); public byte[] get(NamedKey key);
public void put(NamedKey key, byte[] value); public void put(NamedKey key, byte[] value);

View File

@ -27,21 +27,21 @@ import com.metamx.metrics.AbstractMonitor;
*/ */
public class CacheMonitor extends AbstractMonitor public class CacheMonitor extends AbstractMonitor
{ {
private final CacheBroker cacheBroker; private final Cache cache;
private volatile CacheStats prevCacheStats = null; private volatile CacheStats prevCacheStats = null;
public CacheMonitor( public CacheMonitor(
CacheBroker cacheBroker Cache cache
) )
{ {
this.cacheBroker = cacheBroker; this.cache = cache;
} }
@Override @Override
public boolean doMonitor(ServiceEmitter emitter) public boolean doMonitor(ServiceEmitter emitter)
{ {
final CacheStats currCacheStats = cacheBroker.getStats(); final CacheStats currCacheStats = cache.getStats();
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats); final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();

View File

@ -22,7 +22,6 @@ package com.metamx.druid.client.cache;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
@ -33,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
public class MapCacheBroker implements CacheBroker public class MapCache implements Cache
{ {
/** /**
* An interface to limit the operations that can be done on a Cache so that it is easier to reason about what * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what
@ -57,9 +56,9 @@ public class MapCacheBroker implements CacheBroker
private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0);
public static CacheBroker create(final MapCacheBrokerConfig config) public static com.metamx.druid.client.cache.Cache create(final MapCacheConfig config)
{ {
return new MapCacheBroker( return new MapCache(
new ByteCountingLRUMap( new ByteCountingLRUMap(
config.getInitialSize(), config.getInitialSize(),
config.getLogEvictionCount(), config.getLogEvictionCount(),
@ -68,7 +67,7 @@ public class MapCacheBroker implements CacheBroker
); );
} }
MapCacheBroker( MapCache(
ByteCountingLRUMap byteCountingLRUMap ByteCountingLRUMap byteCountingLRUMap
) )
{ {

View File

@ -24,7 +24,7 @@ import org.skife.config.Default;
/** /**
*/ */
public abstract class MapCacheBrokerConfig public abstract class MapCacheConfig
{ {
@Config("${prefix}.sizeInBytes") @Config("${prefix}.sizeInBytes")
@Default("0") @Default("0")

View File

@ -41,16 +41,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
public class MemcachedCacheBroker implements CacheBroker public class MemcachedCache implements Cache
{ {
public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config) public static MemcachedCache create(final MemcachedCacheConfig config)
{ {
try { try {
SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
// disable compression // disable compression
transcoder.setCompressionThreshold(Integer.MAX_VALUE); transcoder.setCompressionThreshold(Integer.MAX_VALUE);
return new MemcachedCacheBroker( return new MemcachedCache(
new MemcachedClient( new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
@ -79,7 +79,7 @@ public class MemcachedCacheBroker implements CacheBroker
private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0);
MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) { MemcachedCache(MemcachedClientIF client, int timeout, int expiration) {
this.timeout = timeout; this.timeout = timeout;
this.expiration = expiration; this.expiration = expiration;
this.client = client; this.client = client;

View File

@ -3,7 +3,7 @@ package com.metamx.druid.client.cache;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
public abstract class MemcachedCacheBrokerConfig public abstract class MemcachedCacheConfig
{ {
@Config("${prefix}.expiration") @Config("${prefix}.expiration")
@Default("31536000") @Default("31536000")

View File

@ -34,13 +34,13 @@ import com.metamx.druid.client.BrokerServerView;
import com.metamx.druid.client.CachingClusteredClient; import com.metamx.druid.client.CachingClusteredClient;
import com.metamx.druid.client.ClientConfig; import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager; import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.cache.CacheBroker; import com.metamx.druid.client.cache.Cache;
import com.metamx.druid.client.cache.CacheConfig; import com.metamx.druid.client.cache.CacheConfig;
import com.metamx.druid.client.cache.CacheMonitor; import com.metamx.druid.client.cache.CacheMonitor;
import com.metamx.druid.client.cache.MapCacheBroker; import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheBrokerConfig; import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCacheBroker; import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheBrokerConfig; import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
@ -78,7 +78,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
private QueryToolChestWarehouse warehouse = null; private QueryToolChestWarehouse warehouse = null;
private HttpClient brokerHttpClient = null; private HttpClient brokerHttpClient = null;
private CacheBroker cacheBroker = null; private Cache cache = null;
private boolean useDiscovery = true; private boolean useDiscovery = true;
@ -122,15 +122,15 @@ public class BrokerNode extends QueryableNode<BrokerNode>
return this; return this;
} }
public CacheBroker getCacheBroker() public Cache getCache()
{ {
initializeCacheBroker(); initializeCacheBroker();
return cacheBroker; return cache;
} }
public BrokerNode setCacheBroker(CacheBroker cacheBroker) public BrokerNode setCache(Cache cache)
{ {
checkFieldNotSetAndSet("cacheBroker", cacheBroker); checkFieldNotSetAndSet("cache", cache);
return this; return this;
} }
@ -185,7 +185,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
final Lifecycle lifecycle = getLifecycle(); final Lifecycle lifecycle = getLifecycle();
final List<Monitor> monitors = getMonitors(); final List<Monitor> monitors = getMonitors();
monitors.add(new CacheMonitor(cacheBroker)); monitors.add(new CacheMonitor(cache));
startMonitoring(monitors); startMonitoring(monitors);
final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient); final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient);
@ -194,7 +194,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
); );
lifecycle.addManagedInstance(clientInventoryManager); lifecycle.addManagedInstance(clientInventoryManager);
final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper()); final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper());
lifecycle.addManagedInstance(baseClient); lifecycle.addManagedInstance(baseClient);
@ -239,25 +239,25 @@ public class BrokerNode extends QueryableNode<BrokerNode>
private void initializeCacheBroker() private void initializeCacheBroker()
{ {
if (cacheBroker == null) { if (cache == null) {
String cacheType = getConfigFactory() String cacheType = getConfigFactory()
.build(CacheConfig.class) .build(CacheConfig.class)
.getType(); .getType();
if (cacheType.equals(CACHE_TYPE_LOCAL)) { if (cacheType.equals(CACHE_TYPE_LOCAL)) {
setCacheBroker( setCache(
MapCacheBroker.create( MapCache.create(
getConfigFactory().buildWithReplacements( getConfigFactory().buildWithReplacements(
MapCacheBrokerConfig.class, MapCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX) ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
) )
) )
); );
} else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) { } else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) {
setCacheBroker( setCache(
MemcachedCacheBroker.create( MemcachedCache.create(
getConfigFactory().buildWithReplacements( getConfigFactory().buildWithReplacements(
MemcachedCacheBrokerConfig.class, MemcachedCacheConfig.class,
ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX) ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX)
) )
) )

View File

@ -31,24 +31,24 @@ public class MapCacheBrokerTest
private static final byte[] HI = "hi".getBytes(); private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes(); private static final byte[] HO = "ho".getBytes();
private ByteCountingLRUMap baseMap; private ByteCountingLRUMap baseMap;
private MapCacheBroker cache; private MapCache cache;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
baseMap = new ByteCountingLRUMap(1024 * 1024); baseMap = new ByteCountingLRUMap(1024 * 1024);
cache = new MapCacheBroker(baseMap); cache = new MapCache(baseMap);
} }
@Test @Test
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
Assert.assertEquals(0, baseMap.size()); Assert.assertEquals(0, baseMap.size());
put(cache, "a", HI, 1); put(cache, "a", HI, 1);
Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
Assert.assertEquals(2, baseMap.size()); Assert.assertEquals(2, baseMap.size());
@ -58,26 +58,26 @@ public class MapCacheBrokerTest
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
Assert.assertEquals(3, baseMap.size()); Assert.assertEquals(3, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO)); Assert.assertEquals(10, get(cache, "the", HO));
cache.close("the"); cache.close("the");
Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
cache.close("a"); cache.close("a");
Assert.assertEquals(0, baseMap.size()); Assert.assertEquals(0, baseMap.size());
} }
public void put(CacheBroker cache, String namespace, byte[] key, Integer value) public void put(Cache cache, String namespace, byte[] key, Integer value)
{ {
cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
} }
public int get(CacheBroker cache, String namespace, byte[] key) public int get(Cache cache, String namespace, byte[] key)
{ {
return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
} }
} }

View File

@ -22,7 +22,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
public static final String NAMESPACE = "default"; public static final String NAMESPACE = "default";
private MemcachedCacheBroker cache; private MemcachedCache cache;
private MemcachedClientIF client; private MemcachedClientIF client;
private static byte[] randBytes; private static byte[] randBytes;
@ -54,7 +54,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
AddrUtil.getAddresses(hosts) AddrUtil.getAddresses(hosts)
); );
cache = new MemcachedCacheBroker( cache = new MemcachedCache(
client, client,
30000, // 30 seconds 30000, // 30 seconds
3600 // 1 hour 3600 // 1 hour
@ -75,7 +75,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
for(int i = 0; i < reps; ++i) { for(int i = 0; i < reps; ++i) {
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
cache.put(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()), randBytes); cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes);
} }
// make sure the write queue is empty // make sure the write queue is empty
client.waitForQueues(1, TimeUnit.HOURS); client.waitForQueues(1, TimeUnit.HOURS);
@ -88,7 +88,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
for (int i = 0; i < reps; i++) { for (int i = 0; i < reps; i++) {
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
bytes = cache.get(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes()));
count += bytes.length; count += bytes.length;
} }
} }
@ -98,13 +98,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
public long timeBulkGetObjects(int reps) { public long timeBulkGetObjects(int reps) {
long count = 0; long count = 0;
for (int i = 0; i < reps; i++) { for (int i = 0; i < reps; i++) {
List<CacheBroker.NamedKey> keys = Lists.newArrayList(); List<Cache.NamedKey> keys = Lists.newArrayList();
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
keys.add(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes()));
} }
Map<CacheBroker.NamedKey, byte[]> results = cache.getBulk(keys); Map<Cache.NamedKey, byte[]> results = cache.getBulk(keys);
for(CacheBroker.NamedKey key : keys) { for(Cache.NamedKey key : keys) {
byte[] bytes = results.get(key); byte[] bytes = results.get(key);
count += bytes.length; count += bytes.length;
} }

View File

@ -22,7 +22,6 @@ package com.metamx.druid.client.cache;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.Pair;
import net.spy.memcached.CASResponse; import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue; import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData; import net.spy.memcached.CachedData;
@ -38,7 +37,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -56,22 +54,22 @@ public class MemcachedCacheBrokerTest
{ {
private static final byte[] HI = "hi".getBytes(); private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes(); private static final byte[] HO = "ho".getBytes();
private MemcachedCacheBroker cache; private MemcachedCache cache;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
MemcachedClientIF client = new MockMemcachedClient(); MemcachedClientIF client = new MockMemcachedClient();
cache = new MemcachedCacheBroker(client, 500, 3600); cache = new MemcachedCache(client, 500, 3600);
} }
@Test @Test
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
put(cache, "a", HI, 1); put(cache, "a", HI, 1);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
@ -79,13 +77,13 @@ public class MemcachedCacheBrokerTest
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO)); Assert.assertEquals(10, get(cache, "the", HO));
cache.close("the"); cache.close("the");
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
cache.close("a"); cache.close("a");
} }
@ -93,15 +91,15 @@ public class MemcachedCacheBrokerTest
@Test @Test
public void testGetBulk() throws Exception public void testGetBulk() throws Exception
{ {
Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
CacheBroker.NamedKey key1 = new CacheBroker.NamedKey("the", HI); Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
CacheBroker.NamedKey key2 = new CacheBroker.NamedKey("the", HO); Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Map<CacheBroker.NamedKey, byte[]> result = cache.getBulk( Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList( Lists.newArrayList(
key1, key1,
key2 key2
@ -112,14 +110,14 @@ public class MemcachedCacheBrokerTest
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
} }
public void put(CacheBroker cache, String namespace, byte[] key, Integer value) public void put(Cache cache, String namespace, byte[] key, Integer value)
{ {
cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
} }
public int get(CacheBroker cache, String namespace, byte[] key) public int get(Cache cache, String namespace, byte[] key)
{ {
return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
} }
} }