diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 23f5ae6992e..625aef4d0e1 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -21,6 +21,7 @@ package com.metamx.druid.client; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -133,7 +134,7 @@ public class CachingClusteredClient implements QueryRunner return Sequences.empty(); } - Map, Pair> segments = Maps.newLinkedHashMap(); + Map, CacheBroker.NamedKey> segments = Maps.newLinkedHashMap(); final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null; @@ -156,21 +157,13 @@ public class CachingClusteredClient implements QueryRunner } } - Map, byte[]> cachedValues = cacheBroker.getBulk( - Iterables.filter(segments.values(), new Predicate>() - { - @Override - public boolean apply( - @Nullable Pair input - ) - { - return input != null; - } - }) + Map cachedValues = cacheBroker.getBulk( + Iterables.filter(segments.values(), Predicates.notNull()) ); - for(Pair segment : segments.keySet()) { - Pair segmentCacheKey = segments.get(segment); + for(Map.Entry, CacheBroker.NamedKey> entry : segments.entrySet()) { + Pair segment = entry.getKey(); + CacheBroker.NamedKey segmentCacheKey = entry.getValue(); final ServerSelector selector = segment.lhs; final SegmentDescriptor descriptor = segment.rhs; @@ -335,19 +328,19 @@ public class CachingClusteredClient implements QueryRunner ); } - private Pair computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) + private CacheBroker.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) { final Interval segmentQueryInterval = descriptor.getInterval(); final byte[] versionBytes = descriptor.getVersion().getBytes(); - return Pair.of( + return new CacheBroker.NamedKey( segmentIdentifier, ByteBuffer .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) .putLong(segmentQueryInterval.getStartMillis()) .putLong(segmentQueryInterval.getEndMillis()) .put(versionBytes) .putInt(descriptor.getPartitionNumber()) - .put(queryCacheKey) + .put(queryCacheKey).array() ); } @@ -355,9 +348,9 @@ public class CachingClusteredClient implements QueryRunner { private final CacheBroker cache; private final ObjectMapper mapper; - private final Pair key; + private final CacheBroker.NamedKey key; - public CachePopulator(CacheBroker cache, ObjectMapper mapper, Pair key) + public CachePopulator(CacheBroker cache, ObjectMapper mapper, CacheBroker.NamedKey key) { this.cache = cache; this.mapper = mapper; @@ -382,7 +375,7 @@ public class CachingClusteredClient implements QueryRunner offset += array.length; } - cache.put(key.lhs, key.rhs.array(), valueBytes); + cache.put(key, valueBytes); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java index b1dc548a962..adcba2e489c 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java @@ -19,20 +19,63 @@ package com.metamx.druid.client.cache; -import com.metamx.common.Pair; +import com.google.common.base.Preconditions; -import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Map; /** */ public interface CacheBroker { - public byte[] get(String identifier, byte[] key); - public void put(String identifier, byte[] key, byte[] value); - public Map, byte[]> getBulk(Iterable> identifierKeyPairs); + public byte[] get(NamedKey key); + public void put(NamedKey key, byte[] value); + public Map getBulk(Iterable keys); - public void close(String identifier); + public void close(String namespace); public CacheStats getStats(); + + public class NamedKey + { + final public String namespace; + final public byte[] key; + + public NamedKey(String namespace, byte[] key) { + Preconditions.checkArgument(namespace != null, "namespace must not be null"); + Preconditions.checkArgument(key != null, "key must not be null"); + this.namespace = namespace; + this.key = key; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NamedKey namedKey = (NamedKey) o; + + if (!namespace.equals(namedKey.namespace)) { + return false; + } + if (!Arrays.equals(key, namedKey.key)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = namespace.hashCode(); + result = 31 * result + Arrays.hashCode(key); + return result; + } + } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java index 85b9156c3fd..d541ab40a66 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java @@ -94,32 +94,32 @@ public class MapCacheBroker implements CacheBroker } @Override - public byte[] get(String identifier, byte[] key) + public byte[] get(NamedKey key) { - return provideCache(identifier).get(key); + return provideCache(key.namespace).get(key.key); } @Override - public void put(String identifier, byte[] key, byte[] value) + public void put(NamedKey key, byte[] value) { - provideCache(identifier).put(key, value); + provideCache(key.namespace).put(key.key, value); } @Override - public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + public Map getBulk(Iterable keys) { - Map, byte[]> retVal = Maps.newHashMap(); + Map retVal = Maps.newHashMap(); - for(Pair e : identifierKeyPairs) { - retVal.put(e, provideCache(e.lhs).get(e.rhs.array())); + for(NamedKey key : keys) { + retVal.put(key, provideCache(key.namespace).get(key.key)); } return retVal; } @Override - public void close(String identifier) + public void close(String namespace) { - provideCache(identifier).close(); + provideCache(namespace).close(); } private Cache provideCache(final String identifier) @@ -148,7 +148,7 @@ public class MapCacheBroker implements CacheBroker } return retVal; } - throw new ISE("Cache for identifier[%s] is closed.", identifier); + throw new ISE("Cache for namespace[%s] is closed.", identifier); } @Override @@ -160,7 +160,7 @@ public class MapCacheBroker implements CacheBroker return; } } - throw new ISE("Cache for identifier[%s] is closed.", identifier); + throw new ISE("Cache for namespace[%s] is closed.", identifier); } @Override diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java index 07953479337..fea391f7912 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java @@ -21,9 +21,7 @@ package com.metamx.druid.client.cache; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.metamx.common.Pair; import net.iharder.base64.Base64; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; @@ -36,8 +34,6 @@ import net.spy.memcached.transcoders.SerializingTranscoder; import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -103,9 +99,9 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public byte[] get(String identifier, byte[] key) + public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKey(identifier, key)); + Future future = client.asyncGet(computeKeyString(key)); try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -131,24 +127,24 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public void put(String identifier, byte[] key, byte[] value) + public void put(NamedKey key, byte[] value) { - client.set(computeKey(identifier, key), expiration, value); + client.set(computeKeyString(key), expiration, value); } @Override - public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + public Map getBulk(Iterable keys) { - Map> keyLookup = Maps.uniqueIndex( - identifierKeyPairs, - new Function, String>() + Map keyLookup = Maps.uniqueIndex( + keys, + new Function() { @Override public String apply( - @Nullable Pair input + @Nullable NamedKey input ) { - return computeKey(input.lhs, input.rhs.array()); + return computeKeyString(input); } } ); @@ -165,7 +161,7 @@ public class MemcachedCacheBroker implements CacheBroker missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - Map, byte[]> results = Maps.newHashMap(); + Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { results.put( keyLookup.get(entry.getKey()), @@ -185,12 +181,12 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public void close(String identifier) + public void close(String namespace) { // no resources to cleanup } - private String computeKey(String identifier, byte[] key) { - return identifier + ":" + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES); + private static String computeKeyString(NamedKey key) { + return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java index 2041e2dca05..35ea7ac42d9 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java @@ -43,12 +43,12 @@ public class MapCacheBrokerTest @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get("a", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); Assert.assertEquals(0, baseMap.size()); put(cache, "a", HI, 1); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(2, baseMap.size()); @@ -58,26 +58,26 @@ public class MapCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(3, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); cache.close("a"); Assert.assertEquals(0, baseMap.size()); } - public void put(CacheBroker cache, String identifier, byte[] key, Integer value) + public void put(CacheBroker cache, String namespace, byte[] key, Integer value) { - cache.put(identifier, key, Ints.toByteArray(value)); + cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String identifier, byte[] key) + public int get(CacheBroker cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(identifier, key)); + return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index 14bd9fdc998..7c25f3ee954 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -4,7 +4,6 @@ import com.google.caliper.Param; import com.google.caliper.Runner; import com.google.caliper.SimpleBenchmark; import com.google.common.collect.Lists; -import com.metamx.common.Pair; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -13,7 +12,6 @@ import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.transcoders.SerializingTranscoder; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Random; @@ -22,7 +20,7 @@ import java.util.concurrent.TimeUnit; public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark { private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; - public static final String IDENTIFIER = "default"; + public static final String NAMESPACE = "default"; private MemcachedCacheBroker cache; private MemcachedClientIF client; @@ -77,7 +75,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for(int i = 0; i < reps; ++i) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - cache.put(IDENTIFIER, key.getBytes(), randBytes); + cache.put(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()), randBytes); } // make sure the write queue is empty client.waitForQueues(1, TimeUnit.HOURS); @@ -90,7 +88,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for (int i = 0; i < reps; i++) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - bytes = cache.get(IDENTIFIER, key.getBytes()); + bytes = cache.get(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); count += bytes.length; } } @@ -100,13 +98,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark public long timeBulkGetObjects(int reps) { long count = 0; for (int i = 0; i < reps; i++) { - List> keys = Lists.newArrayList(); + List keys = Lists.newArrayList(); for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - keys.add(Pair.of(IDENTIFIER, ByteBuffer.wrap(key.getBytes()))); + keys.add(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); } - Map, byte[]> results = cache.getBulk(keys); - for(Pair key : keys) { + Map results = cache.getBulk(keys); + for(CacheBroker.NamedKey key : keys) { byte[] bytes = results.get(key); count += bytes.length; } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 80ddd0072b5..31030260df7 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -68,10 +68,10 @@ public class MemcachedCacheBrokerTest @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get("a", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); put(cache, "a", HI, 1); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(1, get(cache, "a", HI)); @@ -79,13 +79,13 @@ public class MemcachedCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); cache.close("a"); } @@ -93,15 +93,15 @@ public class MemcachedCacheBrokerTest @Test public void testGetBulk() throws Exception { - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); put(cache, "the", HO, 10); - Pair key1 = Pair.of("the", ByteBuffer.wrap(HI)); - Pair key2 = Pair.of("the", ByteBuffer.wrap(HO)); + CacheBroker.NamedKey key1 = new CacheBroker.NamedKey("the", HI); + CacheBroker.NamedKey key2 = new CacheBroker.NamedKey("the", HO); - Map, byte[]> result = cache.getBulk( + Map result = cache.getBulk( Lists.newArrayList( key1, key2 @@ -112,14 +112,14 @@ public class MemcachedCacheBrokerTest Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); } - public void put(CacheBroker cache, String identifier, byte[] key, Integer value) + public void put(CacheBroker cache, String namespace, byte[] key, Integer value) { - cache.put(identifier, key, Ints.toByteArray(value)); + cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String identifier, byte[] key) + public int get(CacheBroker cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(identifier, key)); + return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); } }