replace Pair<String, ByteBuffer> with NamedKey

This commit is contained in:
xvrl 2013-01-18 10:00:06 -08:00
parent 9032ef521b
commit a70ae15585
7 changed files with 115 additions and 85 deletions

View File

@ -21,6 +21,7 @@ package com.metamx.druid.client;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -133,7 +134,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
return Sequences.empty(); return Sequences.empty();
} }
Map<Pair<ServerSelector, SegmentDescriptor>, Pair<String, ByteBuffer>> segments = Maps.newLinkedHashMap(); Map<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> segments = Maps.newLinkedHashMap();
final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null; final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null;
@ -156,21 +157,13 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} }
} }
Map<Pair<String, ByteBuffer>, byte[]> cachedValues = cacheBroker.getBulk( Map<CacheBroker.NamedKey, byte[]> cachedValues = cacheBroker.getBulk(
Iterables.filter(segments.values(), new Predicate<Pair<String, ByteBuffer>>() Iterables.filter(segments.values(), Predicates.notNull())
{
@Override
public boolean apply(
@Nullable Pair<String, ByteBuffer> input
)
{
return input != null;
}
})
); );
for(Pair<ServerSelector, SegmentDescriptor> segment : segments.keySet()) { for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, CacheBroker.NamedKey> entry : segments.entrySet()) {
Pair<String, ByteBuffer> segmentCacheKey = segments.get(segment); Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
CacheBroker.NamedKey segmentCacheKey = entry.getValue();
final ServerSelector selector = segment.lhs; final ServerSelector selector = segment.lhs;
final SegmentDescriptor descriptor = segment.rhs; final SegmentDescriptor descriptor = segment.rhs;
@ -335,19 +328,19 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
); );
} }
private Pair<String, ByteBuffer> computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) private CacheBroker.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
{ {
final Interval segmentQueryInterval = descriptor.getInterval(); final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes(); final byte[] versionBytes = descriptor.getVersion().getBytes();
return Pair.of( return new CacheBroker.NamedKey(
segmentIdentifier, ByteBuffer segmentIdentifier, ByteBuffer
.allocate(16 + versionBytes.length + 4 + queryCacheKey.length) .allocate(16 + versionBytes.length + 4 + queryCacheKey.length)
.putLong(segmentQueryInterval.getStartMillis()) .putLong(segmentQueryInterval.getStartMillis())
.putLong(segmentQueryInterval.getEndMillis()) .putLong(segmentQueryInterval.getEndMillis())
.put(versionBytes) .put(versionBytes)
.putInt(descriptor.getPartitionNumber()) .putInt(descriptor.getPartitionNumber())
.put(queryCacheKey) .put(queryCacheKey).array()
); );
} }
@ -355,9 +348,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
{ {
private final CacheBroker cache; private final CacheBroker cache;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final Pair<String, ByteBuffer> key; private final CacheBroker.NamedKey key;
public CachePopulator(CacheBroker cache, ObjectMapper mapper, Pair<String, ByteBuffer> key) public CachePopulator(CacheBroker cache, ObjectMapper mapper, CacheBroker.NamedKey key)
{ {
this.cache = cache; this.cache = cache;
this.mapper = mapper; this.mapper = mapper;
@ -382,7 +375,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
offset += array.length; offset += array.length;
} }
cache.put(key.lhs, key.rhs.array(), valueBytes); cache.put(key, valueBytes);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -19,20 +19,63 @@
package com.metamx.druid.client.cache; package com.metamx.druid.client.cache;
import com.metamx.common.Pair; import com.google.common.base.Preconditions;
import java.nio.ByteBuffer; import java.util.Arrays;
import java.util.Map; import java.util.Map;
/** /**
*/ */
public interface CacheBroker public interface CacheBroker
{ {
public byte[] get(String identifier, byte[] key); public byte[] get(NamedKey key);
public void put(String identifier, byte[] key, byte[] value); public void put(NamedKey key, byte[] value);
public Map<Pair<String, ByteBuffer>, byte[]> getBulk(Iterable<Pair<String, ByteBuffer>> identifierKeyPairs); public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
public void close(String identifier); public void close(String namespace);
public CacheStats getStats(); public CacheStats getStats();
public class NamedKey
{
final public String namespace;
final public byte[] key;
public NamedKey(String namespace, byte[] key) {
Preconditions.checkArgument(namespace != null, "namespace must not be null");
Preconditions.checkArgument(key != null, "key must not be null");
this.namespace = namespace;
this.key = key;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NamedKey namedKey = (NamedKey) o;
if (!namespace.equals(namedKey.namespace)) {
return false;
}
if (!Arrays.equals(key, namedKey.key)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = namespace.hashCode();
result = 31 * result + Arrays.hashCode(key);
return result;
}
}
} }

View File

@ -94,32 +94,32 @@ public class MapCacheBroker implements CacheBroker
} }
@Override @Override
public byte[] get(String identifier, byte[] key) public byte[] get(NamedKey key)
{ {
return provideCache(identifier).get(key); return provideCache(key.namespace).get(key.key);
} }
@Override @Override
public void put(String identifier, byte[] key, byte[] value) public void put(NamedKey key, byte[] value)
{ {
provideCache(identifier).put(key, value); provideCache(key.namespace).put(key.key, value);
} }
@Override @Override
public Map<Pair<String, ByteBuffer>, byte[]> getBulk(Iterable<Pair<String, ByteBuffer>> identifierKeyPairs) public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{ {
Map<Pair<String, ByteBuffer>, byte[]> retVal = Maps.newHashMap(); Map<NamedKey, byte[]> retVal = Maps.newHashMap();
for(Pair<String, ByteBuffer> e : identifierKeyPairs) { for(NamedKey key : keys) {
retVal.put(e, provideCache(e.lhs).get(e.rhs.array())); retVal.put(key, provideCache(key.namespace).get(key.key));
} }
return retVal; return retVal;
} }
@Override @Override
public void close(String identifier) public void close(String namespace)
{ {
provideCache(identifier).close(); provideCache(namespace).close();
} }
private Cache provideCache(final String identifier) private Cache provideCache(final String identifier)
@ -148,7 +148,7 @@ public class MapCacheBroker implements CacheBroker
} }
return retVal; return retVal;
} }
throw new ISE("Cache for identifier[%s] is closed.", identifier); throw new ISE("Cache for namespace[%s] is closed.", identifier);
} }
@Override @Override
@ -160,7 +160,7 @@ public class MapCacheBroker implements CacheBroker
return; return;
} }
} }
throw new ISE("Cache for identifier[%s] is closed.", identifier); throw new ISE("Cache for namespace[%s] is closed.", identifier);
} }
@Override @Override

View File

@ -21,9 +21,7 @@ package com.metamx.druid.client.cache;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import net.iharder.base64.Base64; import net.iharder.base64.Base64;
import net.spy.memcached.AddrUtil; import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
@ -36,8 +34,6 @@ import net.spy.memcached.transcoders.SerializingTranscoder;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -103,9 +99,9 @@ public class MemcachedCacheBroker implements CacheBroker
} }
@Override @Override
public byte[] get(String identifier, byte[] key) public byte[] get(NamedKey key)
{ {
Future<Object> future = client.asyncGet(computeKey(identifier, key)); Future<Object> future = client.asyncGet(computeKeyString(key));
try { try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) { if(bytes != null) {
@ -131,24 +127,24 @@ public class MemcachedCacheBroker implements CacheBroker
} }
@Override @Override
public void put(String identifier, byte[] key, byte[] value) public void put(NamedKey key, byte[] value)
{ {
client.set(computeKey(identifier, key), expiration, value); client.set(computeKeyString(key), expiration, value);
} }
@Override @Override
public Map<Pair<String, ByteBuffer>, byte[]> getBulk(Iterable<Pair<String, ByteBuffer>> identifierKeyPairs) public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{ {
Map<String, Pair<String, ByteBuffer>> keyLookup = Maps.uniqueIndex( Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
identifierKeyPairs, keys,
new Function<Pair<String, ByteBuffer>, String>() new Function<NamedKey, String>()
{ {
@Override @Override
public String apply( public String apply(
@Nullable Pair<String, ByteBuffer> input @Nullable NamedKey input
) )
{ {
return computeKey(input.lhs, input.rhs.array()); return computeKeyString(input);
} }
} }
); );
@ -165,7 +161,7 @@ public class MemcachedCacheBroker implements CacheBroker
missCount.addAndGet(keyLookup.size() - some.size()); missCount.addAndGet(keyLookup.size() - some.size());
hitCount.addAndGet(some.size()); hitCount.addAndGet(some.size());
Map<Pair<String, ByteBuffer>, byte[]> results = Maps.newHashMap(); Map<NamedKey, byte[]> results = Maps.newHashMap();
for(Map.Entry<String, Object> entry : some.entrySet()) { for(Map.Entry<String, Object> entry : some.entrySet()) {
results.put( results.put(
keyLookup.get(entry.getKey()), keyLookup.get(entry.getKey()),
@ -185,12 +181,12 @@ public class MemcachedCacheBroker implements CacheBroker
} }
@Override @Override
public void close(String identifier) public void close(String namespace)
{ {
// no resources to cleanup // no resources to cleanup
} }
private String computeKey(String identifier, byte[] key) { private static String computeKeyString(NamedKey key) {
return identifier + ":" + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES); return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES);
} }
} }

View File

@ -43,12 +43,12 @@ public class MapCacheBrokerTest
@Test @Test
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
Assert.assertNull(cache.get("a", HI)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI)));
Assert.assertEquals(0, baseMap.size()); Assert.assertEquals(0, baseMap.size());
put(cache, "a", HI, 1); put(cache, "a", HI, 1);
Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("the", HI)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
Assert.assertEquals(2, baseMap.size()); Assert.assertEquals(2, baseMap.size());
@ -58,26 +58,26 @@ public class MapCacheBrokerTest
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
Assert.assertEquals(3, baseMap.size()); Assert.assertEquals(3, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("a", HO)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO)); Assert.assertEquals(10, get(cache, "the", HO));
cache.close("the"); cache.close("the");
Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("a", HO)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO)));
cache.close("a"); cache.close("a");
Assert.assertEquals(0, baseMap.size()); Assert.assertEquals(0, baseMap.size());
} }
public void put(CacheBroker cache, String identifier, byte[] key, Integer value) public void put(CacheBroker cache, String namespace, byte[] key, Integer value)
{ {
cache.put(identifier, key, Ints.toByteArray(value)); cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value));
} }
public int get(CacheBroker cache, String identifier, byte[] key) public int get(CacheBroker cache, String namespace, byte[] key)
{ {
return Ints.fromByteArray(cache.get(identifier, key)); return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key)));
} }
} }

View File

@ -4,7 +4,6 @@ import com.google.caliper.Param;
import com.google.caliper.Runner; import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark; import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import net.spy.memcached.AddrUtil; import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm; import net.spy.memcached.DefaultHashAlgorithm;
@ -13,7 +12,6 @@ import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.SerializingTranscoder;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -22,7 +20,7 @@ import java.util.concurrent.TimeUnit;
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
{ {
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
public static final String IDENTIFIER = "default"; public static final String NAMESPACE = "default";
private MemcachedCacheBroker cache; private MemcachedCacheBroker cache;
private MemcachedClientIF client; private MemcachedClientIF client;
@ -77,7 +75,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
for(int i = 0; i < reps; ++i) { for(int i = 0; i < reps; ++i) {
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
cache.put(IDENTIFIER, key.getBytes(), randBytes); cache.put(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()), randBytes);
} }
// make sure the write queue is empty // make sure the write queue is empty
client.waitForQueues(1, TimeUnit.HOURS); client.waitForQueues(1, TimeUnit.HOURS);
@ -90,7 +88,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
for (int i = 0; i < reps; i++) { for (int i = 0; i < reps; i++) {
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
bytes = cache.get(IDENTIFIER, key.getBytes()); bytes = cache.get(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()));
count += bytes.length; count += bytes.length;
} }
} }
@ -100,13 +98,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
public long timeBulkGetObjects(int reps) { public long timeBulkGetObjects(int reps) {
long count = 0; long count = 0;
for (int i = 0; i < reps; i++) { for (int i = 0; i < reps; i++) {
List<Pair<String, ByteBuffer>> keys = Lists.newArrayList(); List<CacheBroker.NamedKey> keys = Lists.newArrayList();
for(int k = 0; k < objectCount; ++k) { for(int k = 0; k < objectCount; ++k) {
String key = BASE_KEY + k; String key = BASE_KEY + k;
keys.add(Pair.of(IDENTIFIER, ByteBuffer.wrap(key.getBytes()))); keys.add(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()));
} }
Map<Pair<String, ByteBuffer>, byte[]> results = cache.getBulk(keys); Map<CacheBroker.NamedKey, byte[]> results = cache.getBulk(keys);
for(Pair<String, ByteBuffer> key : keys) { for(CacheBroker.NamedKey key : keys) {
byte[] bytes = results.get(key); byte[] bytes = results.get(key);
count += bytes.length; count += bytes.length;
} }

View File

@ -68,10 +68,10 @@ public class MemcachedCacheBrokerTest
@Test @Test
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
Assert.assertNull(cache.get("a", HI)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI)));
put(cache, "a", HI, 1); put(cache, "a", HI, 1);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("the", HI)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
@ -79,13 +79,13 @@ public class MemcachedCacheBrokerTest
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("a", HO)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO)); Assert.assertEquals(10, get(cache, "the", HO));
cache.close("the"); cache.close("the");
Assert.assertEquals(1, get(cache, "a", HI)); Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get("a", HO)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO)));
cache.close("a"); cache.close("a");
} }
@ -93,15 +93,15 @@ public class MemcachedCacheBrokerTest
@Test @Test
public void testGetBulk() throws Exception public void testGetBulk() throws Exception
{ {
Assert.assertNull(cache.get("the", HI)); Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI)));
put(cache, "the", HI, 2); put(cache, "the", HI, 2);
put(cache, "the", HO, 10); put(cache, "the", HO, 10);
Pair<String, ByteBuffer> key1 = Pair.of("the", ByteBuffer.wrap(HI)); CacheBroker.NamedKey key1 = new CacheBroker.NamedKey("the", HI);
Pair<String, ByteBuffer> key2 = Pair.of("the", ByteBuffer.wrap(HO)); CacheBroker.NamedKey key2 = new CacheBroker.NamedKey("the", HO);
Map<Pair<String, ByteBuffer>, byte[]> result = cache.getBulk( Map<CacheBroker.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList( Lists.newArrayList(
key1, key1,
key2 key2
@ -112,14 +112,14 @@ public class MemcachedCacheBrokerTest
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
} }
public void put(CacheBroker cache, String identifier, byte[] key, Integer value) public void put(CacheBroker cache, String namespace, byte[] key, Integer value)
{ {
cache.put(identifier, key, Ints.toByteArray(value)); cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value));
} }
public int get(CacheBroker cache, String identifier, byte[] key) public int get(CacheBroker cache, String namespace, byte[] key)
{ {
return Ints.fromByteArray(cache.get(identifier, key)); return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key)));
} }
} }