From a2090411a3a821469b06fb6e978ec331b1ef3133 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 7 Jan 2013 16:54:21 -0800 Subject: [PATCH 1/9] modify cacheBroker interface to implement bulk-get --- .../druid/client/CachingClusteredClient.java | 137 ++++++++++-------- .../com/metamx/druid/client/cache/Cache.java | 31 ---- .../druid/client/cache/CacheBroker.java | 12 +- .../druid/client/cache/MapCacheBroker.java | 43 +++++- .../client/cache/MemcachedCacheBroker.java | 125 +++++++++++----- .../client/cache/MapCacheBrokerTest.java | 47 +++--- .../cache/MemcachedCacheBrokerBenchmark.java | 33 ++++- .../cache/MemcachedCacheBrokerTest.java | 47 +++--- 8 files changed, 288 insertions(+), 187 deletions(-) delete mode 100644 client/src/main/java/com/metamx/druid/client/cache/Cache.java diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 47376dd2e68..23f5ae6992e 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -20,6 +20,7 @@ package com.metamx.druid.client; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -40,7 +41,6 @@ import com.metamx.druid.Query; import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.aggregation.AggregatorFactory; -import com.metamx.druid.client.cache.Cache; import com.metamx.druid.client.cache.CacheBroker; import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; @@ -57,6 +57,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -98,7 +99,7 @@ public class CachingClusteredClient implements QueryRunner @Override public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) { - CachingClusteredClient.this.cacheBroker.provideCache(segment.getIdentifier()).close(); + CachingClusteredClient.this.cacheBroker.close(segment.getIdentifier()); return ServerView.CallbackAction.CONTINUE; } } @@ -111,7 +112,8 @@ public class CachingClusteredClient implements QueryRunner final QueryToolChest> toolChest = warehouse.getToolChest(query); final CacheStrategy> strategy = toolChest.getCacheStrategy(query); - final Map> segs = Maps.newTreeMap(); + final Map> serverSegments = Maps.newTreeMap(); + final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); @@ -131,10 +133,9 @@ public class CachingClusteredClient implements QueryRunner return Sequences.empty(); } - byte[] queryCacheKey = null; - if (strategy != null) { - queryCacheKey = strategy.computeCacheKey(query); - } + Map, Pair> segments = Maps.newLinkedHashMap(); + + final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null; for (Interval interval : rewrittenQuery.getIntervals()) { List> serversLookup = timeline.lookup(interval); @@ -146,55 +147,61 @@ public class CachingClusteredClient implements QueryRunner holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() ); - if (queryCacheKey == null) { - final DruidServer server = selector.pick(); - List descriptors = segs.get(server); - - if (descriptors == null) { - descriptors = Lists.newArrayList(); - segs.put(server, descriptors); - } - - descriptors.add(descriptor); - } - else { - final Interval segmentQueryInterval = holder.getInterval(); - final byte[] versionBytes = descriptor.getVersion().getBytes(); - - final byte[] cacheKey = ByteBuffer - .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) - .putLong(segmentQueryInterval.getStartMillis()) - .putLong(segmentQueryInterval.getEndMillis()) - .put(versionBytes) - .putInt(descriptor.getPartitionNumber()) - .put(queryCacheKey) - .array(); - final String segmentIdentifier = selector.getSegment().getIdentifier(); - final Cache cache = cacheBroker.provideCache(segmentIdentifier); - final byte[] cachedValue = cache.get(cacheKey); - - if (useCache && cachedValue != null) { - cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); - } else { - final DruidServer server = selector.pick(); - List descriptors = segs.get(server); - - if (descriptors == null) { - descriptors = Lists.newArrayList(); - segs.put(server, descriptors); - } - - descriptors.add(descriptor); - cachePopulatorMap.put( - String.format("%s_%s", segmentIdentifier, segmentQueryInterval), - new CachePopulator(cache, objectMapper, cacheKey) - ); - } - } + segments.put( + Pair.of(selector, descriptor), + queryCacheKey == null ? null : + computeSegmentCacheKey(selector.getSegment().getIdentifier(), descriptor, queryCacheKey) + ); } } } + Map, byte[]> cachedValues = cacheBroker.getBulk( + Iterables.filter(segments.values(), new Predicate>() + { + @Override + public boolean apply( + @Nullable Pair input + ) + { + return input != null; + } + }) + ); + + for(Pair segment : segments.keySet()) { + Pair segmentCacheKey = segments.get(segment); + + final ServerSelector selector = segment.lhs; + final SegmentDescriptor descriptor = segment.rhs; + final Interval segmentQueryInterval = descriptor.getInterval(); + + final byte[] cachedValue = segmentCacheKey == null ? null : cachedValues.get(segmentCacheKey); + + if (useCache && cachedValue != null) { + cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); + } else { + final DruidServer server = selector.pick(); + List descriptors = serverSegments.get(server); + + if (descriptors == null) { + descriptors = Lists.newArrayList(); + serverSegments.put(server, descriptors); + } + + descriptors.add(descriptor); + + if(segmentCacheKey != null) { + final String segmentIdentifier = selector.getSegment().getIdentifier(); + cachePopulatorMap.put( + String.format("%s_%s", segmentIdentifier, segmentQueryInterval), + new CachePopulator(cacheBroker, objectMapper, segmentCacheKey) + ); + } + } + } + + return new LazySequence( new Supplier>() { @@ -264,7 +271,7 @@ public class CachingClusteredClient implements QueryRunner @SuppressWarnings("unchecked") private void addSequencesFromServer(ArrayList>> listOfSequences) { - for (Map.Entry> entry : segs.entrySet()) { + for (Map.Entry> entry : serverSegments.entrySet()) { final DruidServer server = entry.getKey(); final List descriptors = entry.getValue(); @@ -328,13 +335,29 @@ public class CachingClusteredClient implements QueryRunner ); } + private Pair computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) + { + final Interval segmentQueryInterval = descriptor.getInterval(); + final byte[] versionBytes = descriptor.getVersion().getBytes(); + + return Pair.of( + segmentIdentifier, ByteBuffer + .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) + .putLong(segmentQueryInterval.getStartMillis()) + .putLong(segmentQueryInterval.getEndMillis()) + .put(versionBytes) + .putInt(descriptor.getPartitionNumber()) + .put(queryCacheKey) + ); + } + private static class CachePopulator { - private final Cache cache; + private final CacheBroker cache; private final ObjectMapper mapper; - private final byte[] key; + private final Pair key; - public CachePopulator(Cache cache, ObjectMapper mapper, byte[] key) + public CachePopulator(CacheBroker cache, ObjectMapper mapper, Pair key) { this.cache = cache; this.mapper = mapper; @@ -359,7 +382,7 @@ public class CachingClusteredClient implements QueryRunner offset += array.length; } - cache.put(key, valueBytes); + cache.put(key.lhs, key.rhs.array(), valueBytes); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/client/src/main/java/com/metamx/druid/client/cache/Cache.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java deleted file mode 100644 index e7907c9548f..00000000000 --- a/client/src/main/java/com/metamx/druid/client/cache/Cache.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.client.cache; - -/** - * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what - * is actually going to be done. - */ -public interface Cache -{ - public byte[] get(byte[] key); - public void put(byte[] key, byte[] value); - public void close(); -} diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java index 5f028c06e72..b1dc548a962 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java @@ -19,10 +19,20 @@ package com.metamx.druid.client.cache; +import com.metamx.common.Pair; + +import java.nio.ByteBuffer; +import java.util.Map; + /** */ public interface CacheBroker { + public byte[] get(String identifier, byte[] key); + public void put(String identifier, byte[] key, byte[] value); + public Map, byte[]> getBulk(Iterable> identifierKeyPairs); + + public void close(String identifier); + public CacheStats getStats(); - public Cache provideCache(String identifier); } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java index d8ec202021a..85b9156c3fd 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java @@ -22,6 +22,7 @@ package com.metamx.druid.client.cache; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.metamx.common.ISE; +import com.metamx.common.Pair; import java.nio.ByteBuffer; import java.util.Collections; @@ -34,6 +35,17 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MapCacheBroker implements CacheBroker { + /** + * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what + * is actually going to be done. + */ + public interface Cache + { + public byte[] get(byte[] key); + public void put(byte[] key, byte[] value); + public void close(); + } + private final Map baseMap; private final ByteCountingLRUMap byteCountingLRUMap; @@ -68,7 +80,6 @@ public class MapCacheBroker implements CacheBroker ids = new AtomicInteger(); } - @Override public CacheStats getStats() { @@ -83,7 +94,35 @@ public class MapCacheBroker implements CacheBroker } @Override - public Cache provideCache(final String identifier) + public byte[] get(String identifier, byte[] key) + { + return provideCache(identifier).get(key); + } + + @Override + public void put(String identifier, byte[] key, byte[] value) + { + provideCache(identifier).put(key, value); + } + + @Override + public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + { + Map, byte[]> retVal = Maps.newHashMap(); + + for(Pair e : identifierKeyPairs) { + retVal.put(e, provideCache(e.lhs).get(e.rhs.array())); + } + return retVal; + } + + @Override + public void close(String identifier) + { + provideCache(identifier).close(); + } + + private Cache provideCache(final String identifier) { synchronized (cacheCache) { final Cache cachedCache = cacheCache.get(identifier); diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java index 2f1af877d8c..f2b709de8e5 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java @@ -19,7 +19,11 @@ package com.metamx.druid.client.cache; +import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.metamx.common.Pair; import net.iharder.base64.Base64; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; @@ -27,9 +31,14 @@ import net.spy.memcached.DefaultHashAlgorithm; import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; +import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.transcoders.SerializingTranscoder; +import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -94,52 +103,92 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public Cache provideCache(final String identifier) + public byte[] get(String identifier, byte[] key) { - return new Cache() - { - @Override - public byte[] get(byte[] key) - { - Future future = client.asyncGet(computeKey(identifier, key)); - try { - byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); - if(bytes != null) { - hitCount.incrementAndGet(); + Future future = client.asyncGet(computeKey(identifier, key)); + try { + byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); + if(bytes != null) { + hitCount.incrementAndGet(); + } + else { + missCount.incrementAndGet(); + } + return bytes; + } + catch(TimeoutException e) { + timeoutCount.incrementAndGet(); + future.cancel(false); + return null; + } + catch(InterruptedException e) { + throw Throwables.propagate(e); + } + catch(ExecutionException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void put(String identifier, byte[] key, byte[] value) + { + client.set(computeKey(identifier, key), expiration, value); + } + + @Override + public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + { + Map> keyLookup = Maps.uniqueIndex( + identifierKeyPairs, + new Function, String>() + { + @Override + public String apply( + @Nullable Pair input + ) + { + return computeKey(input.lhs, input.rhs.array()); } - else { - missCount.incrementAndGet(); - } - return bytes; - } - catch(TimeoutException e) { - timeoutCount.incrementAndGet(); - future.cancel(false); - return null; - } - catch(InterruptedException e) { - throw Throwables.propagate(e); - } - catch(ExecutionException e) { - throw Throwables.propagate(e); } + ); + + BulkFuture> future = client.asyncGetBulk(keyLookup.keySet()); + + try { + Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); + + if(future.isTimeout()) { + future.cancel(false); + timeoutCount.incrementAndGet(); + } + missCount.addAndGet(keyLookup.size() - some.size()); + hitCount.addAndGet(some.size()); + + Map, byte[]> results = Maps.newHashMap(); + for(Map.Entry entry : some.entrySet()) { + results.put( + keyLookup.get(entry.getKey()), + (byte[])entry.getValue() + ); } - @Override - public void put(byte[] key, byte[] value) - { - client.set(computeKey(identifier, key), expiration, value); - } + return results; + } + catch(InterruptedException e) { + throw Throwables.propagate(e); + } + catch(ExecutionException e) { + throw Throwables.propagate(e); + } + } - @Override - public void close() - { - // no resources to cleanup - } - }; + @Override + public void close(String identifier) + { + // no resources to cleanup } private String computeKey(String identifier, byte[] key) { - return identifier + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES); + return identifier + ":" + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java index 4338a23a10d..2041e2dca05 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java @@ -31,56 +31,53 @@ public class MapCacheBrokerTest private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); private ByteCountingLRUMap baseMap; - private MapCacheBroker broker; + private MapCacheBroker cache; @Before public void setUp() throws Exception { baseMap = new ByteCountingLRUMap(1024 * 1024); - broker = new MapCacheBroker(baseMap); + cache = new MapCacheBroker(baseMap); } @Test public void testSanity() throws Exception { - Cache aCache = broker.provideCache("a"); - Cache theCache = broker.provideCache("the"); - - Assert.assertNull(aCache.get(HI)); + Assert.assertNull(cache.get("a", HI)); Assert.assertEquals(0, baseMap.size()); - put(aCache, HI, 1); + put(cache, "a", HI, 1); Assert.assertEquals(1, baseMap.size()); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(theCache.get(HI)); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("the", HI)); - put(theCache, HI, 2); + put(cache, "the", HI, 2); Assert.assertEquals(2, baseMap.size()); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertEquals(2, get(theCache, HI)); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertEquals(2, get(cache, "the", HI)); - put(theCache, HO, 10); + put(cache, "the", HO, 10); Assert.assertEquals(3, baseMap.size()); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(aCache.get(HO)); - Assert.assertEquals(2, get(theCache, HI)); - Assert.assertEquals(10, get(theCache, HO)); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("a", HO)); + Assert.assertEquals(2, get(cache, "the", HI)); + Assert.assertEquals(10, get(cache, "the", HO)); - theCache.close(); + cache.close("the"); Assert.assertEquals(1, baseMap.size()); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(aCache.get(HO)); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("a", HO)); - aCache.close(); + cache.close("a"); Assert.assertEquals(0, baseMap.size()); } - public void put(Cache cache, byte[] key, Integer value) + public void put(CacheBroker cache, String identifier, byte[] key, Integer value) { - cache.put(key, Ints.toByteArray(value)); + cache.put(identifier, key, Ints.toByteArray(value)); } - public int get(Cache cache, byte[] key) + public int get(CacheBroker cache, String identifier, byte[] key) { - return Ints.fromByteArray(cache.get(key)); + return Ints.fromByteArray(cache.get(identifier, key)); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index d1839098e8b..1aa5ed205e7 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -3,6 +3,8 @@ package com.metamx.druid.client.cache; import com.google.caliper.Param; import com.google.caliper.Runner; import com.google.caliper.SimpleBenchmark; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -11,17 +13,20 @@ import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.transcoders.SerializingTranscoder; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark { private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; + public static final String IDENTIFIER = "default"; - private MemcachedCacheBroker broker; + private MemcachedCacheBroker cache; private MemcachedClientIF client; - private Cache cache; private static byte[] randBytes; @Param({"localhost:11211"}) String hosts; @@ -53,14 +58,12 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark AddrUtil.getAddresses(hosts) ); - broker = new MemcachedCacheBroker( + cache = new MemcachedCacheBroker( client, 500, // 500 milliseconds 3600 // 1 hour ); - cache = broker.provideCache("default"); - randBytes = new byte[objectSize * 1024]; new Random(0).nextBytes(randBytes); @@ -77,7 +80,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for(int i = 0; i < reps; ++i) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + i; - cache.put(key.getBytes(), randBytes); + cache.put(IDENTIFIER, key.getBytes(), randBytes); } // make sure the write queue is empty client.waitForQueues(1, TimeUnit.HOURS); @@ -89,14 +92,28 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark long count = 0; for (int i = 0; i < reps; i++) { for(int k = 0; k < objectCount; ++k) { - String key = BASE_KEY + i; - bytes = cache.get(key.getBytes()); + String key = BASE_KEY + k; + bytes = cache.get(IDENTIFIER, key.getBytes()); count += bytes.length; } } return count; } + public long timeBulkGetObjects(int reps) { + long count = 0; + for (int i = 0; i < reps; i++) { + List> keys = Lists.newArrayList(); + for(int k = 0; k < objectCount; ++k) { + String key = BASE_KEY + k; + keys.add(Pair.of(IDENTIFIER, ByteBuffer.wrap(key.getBytes()))); + } + Map, byte[]> results = cache.getBulk(keys); + for(byte[] bytes : results.values()) count += bytes.length; + } + return count; + } + public static void main(String[] args) throws Exception { Runner.main(MemcachedCacheBrokerBenchmark.class, args); } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 3cb2dba09b8..9bfb928a949 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -51,51 +51,48 @@ public class MemcachedCacheBrokerTest { private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); - private MemcachedCacheBroker broker; + private MemcachedCacheBroker cache; @Before public void setUp() throws Exception { MemcachedClientIF client = new MockMemcachedClient(); - broker = new MemcachedCacheBroker(client, 500, 3600); + cache = new MemcachedCacheBroker(client, 500, 3600); } @Test public void testSanity() throws Exception { - Cache aCache = broker.provideCache("a"); - Cache theCache = broker.provideCache("the"); + Assert.assertNull(cache.get("a", HI)); + put(cache, "a", HI, 1); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("the", HI)); - Assert.assertNull(aCache.get(HI)); - put(aCache, HI, 1); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(theCache.get(HI)); + put(cache, "the", HI, 2); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertEquals(2, get(cache, "the", HI)); - put(theCache, HI, 2); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertEquals(2, get(theCache, HI)); + put(cache, "the", HO, 10); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("a", HO)); + Assert.assertEquals(2, get(cache, "the", HI)); + Assert.assertEquals(10, get(cache, "the", HO)); - put(theCache, HO, 10); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(aCache.get(HO)); - Assert.assertEquals(2, get(theCache, HI)); - Assert.assertEquals(10, get(theCache, HO)); + cache.close("the"); + Assert.assertEquals(1, get(cache, "a", HI)); + Assert.assertNull(cache.get("a", HO)); - theCache.close(); - Assert.assertEquals(1, get(aCache, HI)); - Assert.assertNull(aCache.get(HO)); - - aCache.close(); + cache.close("a"); } - public void put(Cache cache, byte[] key, Integer value) + public void put(CacheBroker cache, String identifier, byte[] key, Integer value) { - cache.put(key, Ints.toByteArray(value)); + cache.put(identifier, key, Ints.toByteArray(value)); } - public int get(Cache cache, byte[] key) + public int get(CacheBroker cache, String identifier, byte[] key) { - return Ints.fromByteArray(cache.get(key)); + return Ints.fromByteArray(cache.get(identifier, key)); } } From e2788187fbbd624fd91bd006e94bcd7b17d6902c Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 16 Jan 2013 16:02:51 -0800 Subject: [PATCH 2/9] don't let timeout skew benchmark stats --- .../druid/client/cache/MemcachedCacheBrokerBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index 1aa5ed205e7..d0b34af57df 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -60,7 +60,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark cache = new MemcachedCacheBroker( client, - 500, // 500 milliseconds + 30000, // 30 seconds 3600 // 1 hour ); From dcaa77a883bd1690dda721a96621ad15f6e6f656 Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 16 Jan 2013 19:15:43 -0800 Subject: [PATCH 3/9] implement bulk get test --- .../cache/MemcachedCacheBrokerTest.java | 93 ++++++++++++++++++- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 9bfb928a949..80ddd0072b5 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -19,7 +19,10 @@ package com.metamx.druid.client.cache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.common.Pair; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; @@ -27,6 +30,7 @@ import net.spy.memcached.ConnectionObserver; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.NodeLocator; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.Transcoder; import org.junit.Assert; @@ -34,6 +38,7 @@ import org.junit.Before; import org.junit.Test; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -85,6 +90,28 @@ public class MemcachedCacheBrokerTest cache.close("a"); } + @Test + public void testGetBulk() throws Exception + { + Assert.assertNull(cache.get("the", HI)); + + put(cache, "the", HI, 2); + put(cache, "the", HO, 10); + + Pair key1 = Pair.of("the", ByteBuffer.wrap(HI)); + Pair key2 = Pair.of("the", ByteBuffer.wrap(HO)); + + Map, byte[]> result = cache.getBulk( + Lists.newArrayList( + key1, + key2 + ) + ); + + Assert.assertEquals(2, Ints.fromByteArray(result.get(key1))); + Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); + } + public void put(CacheBroker cache, String identifier, byte[] key, Integer value) { cache.put(identifier, key, Ints.toByteArray(value)); @@ -362,9 +389,67 @@ class MockMemcachedClient implements MemcachedClientIF } @Override - public BulkFuture> asyncGetBulk(Iterator keys, Transcoder tc) + public BulkFuture> asyncGetBulk(final Iterator keys, final Transcoder tc) { - throw new UnsupportedOperationException("not implemented"); + return new BulkFuture>() + { + @Override + public boolean isTimeout() + { + return false; + } + + @Override + public Map getSome(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException + { + return get(); + } + + @Override + public OperationStatus getStatus() + { + return null; + } + + @Override + public boolean cancel(boolean b) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public Map get() throws InterruptedException, ExecutionException + { + Map retVal = Maps.newHashMap(); + + while(keys.hasNext()) { + String key = keys.next(); + CachedData data = theMap.get(key); + retVal.put(key, data != null ? tc.decode(data) : null); + } + + return retVal; + } + + @Override + public Map get(long l, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException + { + return get(); + } + }; } @Override @@ -380,9 +465,9 @@ class MockMemcachedClient implements MemcachedClientIF } @Override - public BulkFuture> asyncGetBulk(Collection keys) + public BulkFuture> asyncGetBulk(final Collection keys) { - throw new UnsupportedOperationException("not implemented"); + return asyncGetBulk(keys.iterator(), transcoder); } @Override From 0bacb85a4a7e62202d1618c548c980ec50ba653b Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 16 Jan 2013 19:18:14 -0800 Subject: [PATCH 4/9] fix duplicate keys, shutdown gracefully and make sure we check all multiget keys in memcached benchmark --- .../client/cache/MemcachedCacheBrokerBenchmark.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index d0b34af57df..14bd9fdc998 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -44,8 +44,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark // disable compression transcoder.setCompressionThreshold(Integer.MAX_VALUE); - System.out.println(String.format("Using memcached hosts [%s]", hosts)); - client = new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) @@ -72,14 +70,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark @Override protected void tearDown() throws Exception { - client.flush(); - client.shutdown(); + client.shutdown(1, TimeUnit.MINUTES); } public void timePutObjects(int reps) { for(int i = 0; i < reps; ++i) { for(int k = 0; k < objectCount; ++k) { - String key = BASE_KEY + i; + String key = BASE_KEY + k; cache.put(IDENTIFIER, key.getBytes(), randBytes); } // make sure the write queue is empty @@ -109,7 +106,10 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark keys.add(Pair.of(IDENTIFIER, ByteBuffer.wrap(key.getBytes()))); } Map, byte[]> results = cache.getBulk(keys); - for(byte[] bytes : results.values()) count += bytes.length; + for(Pair key : keys) { + byte[] bytes = results.get(key); + count += bytes.length; + } } return count; } From 9032ef521b10f85f3513dda6b2ff66f8668ab4f4 Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 18 Jan 2013 10:18:16 -0800 Subject: [PATCH 5/9] fix interrupted thread --- .../com/metamx/druid/client/cache/MemcachedCacheBroker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java index f2b709de8e5..07953479337 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java @@ -122,6 +122,7 @@ public class MemcachedCacheBroker implements CacheBroker return null; } catch(InterruptedException e) { + Thread.currentThread().interrupt(); throw Throwables.propagate(e); } catch(ExecutionException e) { @@ -175,6 +176,7 @@ public class MemcachedCacheBroker implements CacheBroker return results; } catch(InterruptedException e) { + Thread.currentThread().interrupt(); throw Throwables.propagate(e); } catch(ExecutionException e) { From a70ae155851f2b5f42d2c5a3c451185274a6c00a Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 18 Jan 2013 10:00:06 -0800 Subject: [PATCH 6/9] replace Pair with NamedKey --- .../druid/client/CachingClusteredClient.java | 33 +++++------ .../druid/client/cache/CacheBroker.java | 55 +++++++++++++++++-- .../druid/client/cache/MapCacheBroker.java | 24 ++++---- .../client/cache/MemcachedCacheBroker.java | 32 +++++------ .../client/cache/MapCacheBrokerTest.java | 16 +++--- .../cache/MemcachedCacheBrokerBenchmark.java | 16 +++--- .../cache/MemcachedCacheBrokerTest.java | 24 ++++---- 7 files changed, 115 insertions(+), 85 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 23f5ae6992e..625aef4d0e1 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -21,6 +21,7 @@ package com.metamx.druid.client; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -133,7 +134,7 @@ public class CachingClusteredClient implements QueryRunner return Sequences.empty(); } - Map, Pair> segments = Maps.newLinkedHashMap(); + Map, CacheBroker.NamedKey> segments = Maps.newLinkedHashMap(); final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null; @@ -156,21 +157,13 @@ public class CachingClusteredClient implements QueryRunner } } - Map, byte[]> cachedValues = cacheBroker.getBulk( - Iterables.filter(segments.values(), new Predicate>() - { - @Override - public boolean apply( - @Nullable Pair input - ) - { - return input != null; - } - }) + Map cachedValues = cacheBroker.getBulk( + Iterables.filter(segments.values(), Predicates.notNull()) ); - for(Pair segment : segments.keySet()) { - Pair segmentCacheKey = segments.get(segment); + for(Map.Entry, CacheBroker.NamedKey> entry : segments.entrySet()) { + Pair segment = entry.getKey(); + CacheBroker.NamedKey segmentCacheKey = entry.getValue(); final ServerSelector selector = segment.lhs; final SegmentDescriptor descriptor = segment.rhs; @@ -335,19 +328,19 @@ public class CachingClusteredClient implements QueryRunner ); } - private Pair computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) + private CacheBroker.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) { final Interval segmentQueryInterval = descriptor.getInterval(); final byte[] versionBytes = descriptor.getVersion().getBytes(); - return Pair.of( + return new CacheBroker.NamedKey( segmentIdentifier, ByteBuffer .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) .putLong(segmentQueryInterval.getStartMillis()) .putLong(segmentQueryInterval.getEndMillis()) .put(versionBytes) .putInt(descriptor.getPartitionNumber()) - .put(queryCacheKey) + .put(queryCacheKey).array() ); } @@ -355,9 +348,9 @@ public class CachingClusteredClient implements QueryRunner { private final CacheBroker cache; private final ObjectMapper mapper; - private final Pair key; + private final CacheBroker.NamedKey key; - public CachePopulator(CacheBroker cache, ObjectMapper mapper, Pair key) + public CachePopulator(CacheBroker cache, ObjectMapper mapper, CacheBroker.NamedKey key) { this.cache = cache; this.mapper = mapper; @@ -382,7 +375,7 @@ public class CachingClusteredClient implements QueryRunner offset += array.length; } - cache.put(key.lhs, key.rhs.array(), valueBytes); + cache.put(key, valueBytes); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java index b1dc548a962..adcba2e489c 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java @@ -19,20 +19,63 @@ package com.metamx.druid.client.cache; -import com.metamx.common.Pair; +import com.google.common.base.Preconditions; -import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Map; /** */ public interface CacheBroker { - public byte[] get(String identifier, byte[] key); - public void put(String identifier, byte[] key, byte[] value); - public Map, byte[]> getBulk(Iterable> identifierKeyPairs); + public byte[] get(NamedKey key); + public void put(NamedKey key, byte[] value); + public Map getBulk(Iterable keys); - public void close(String identifier); + public void close(String namespace); public CacheStats getStats(); + + public class NamedKey + { + final public String namespace; + final public byte[] key; + + public NamedKey(String namespace, byte[] key) { + Preconditions.checkArgument(namespace != null, "namespace must not be null"); + Preconditions.checkArgument(key != null, "key must not be null"); + this.namespace = namespace; + this.key = key; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NamedKey namedKey = (NamedKey) o; + + if (!namespace.equals(namedKey.namespace)) { + return false; + } + if (!Arrays.equals(key, namedKey.key)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = namespace.hashCode(); + result = 31 * result + Arrays.hashCode(key); + return result; + } + } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java index 85b9156c3fd..d541ab40a66 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java @@ -94,32 +94,32 @@ public class MapCacheBroker implements CacheBroker } @Override - public byte[] get(String identifier, byte[] key) + public byte[] get(NamedKey key) { - return provideCache(identifier).get(key); + return provideCache(key.namespace).get(key.key); } @Override - public void put(String identifier, byte[] key, byte[] value) + public void put(NamedKey key, byte[] value) { - provideCache(identifier).put(key, value); + provideCache(key.namespace).put(key.key, value); } @Override - public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + public Map getBulk(Iterable keys) { - Map, byte[]> retVal = Maps.newHashMap(); + Map retVal = Maps.newHashMap(); - for(Pair e : identifierKeyPairs) { - retVal.put(e, provideCache(e.lhs).get(e.rhs.array())); + for(NamedKey key : keys) { + retVal.put(key, provideCache(key.namespace).get(key.key)); } return retVal; } @Override - public void close(String identifier) + public void close(String namespace) { - provideCache(identifier).close(); + provideCache(namespace).close(); } private Cache provideCache(final String identifier) @@ -148,7 +148,7 @@ public class MapCacheBroker implements CacheBroker } return retVal; } - throw new ISE("Cache for identifier[%s] is closed.", identifier); + throw new ISE("Cache for namespace[%s] is closed.", identifier); } @Override @@ -160,7 +160,7 @@ public class MapCacheBroker implements CacheBroker return; } } - throw new ISE("Cache for identifier[%s] is closed.", identifier); + throw new ISE("Cache for namespace[%s] is closed.", identifier); } @Override diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java index 07953479337..fea391f7912 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java @@ -21,9 +21,7 @@ package com.metamx.druid.client.cache; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.metamx.common.Pair; import net.iharder.base64.Base64; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; @@ -36,8 +34,6 @@ import net.spy.memcached.transcoders.SerializingTranscoder; import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -103,9 +99,9 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public byte[] get(String identifier, byte[] key) + public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKey(identifier, key)); + Future future = client.asyncGet(computeKeyString(key)); try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -131,24 +127,24 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public void put(String identifier, byte[] key, byte[] value) + public void put(NamedKey key, byte[] value) { - client.set(computeKey(identifier, key), expiration, value); + client.set(computeKeyString(key), expiration, value); } @Override - public Map, byte[]> getBulk(Iterable> identifierKeyPairs) + public Map getBulk(Iterable keys) { - Map> keyLookup = Maps.uniqueIndex( - identifierKeyPairs, - new Function, String>() + Map keyLookup = Maps.uniqueIndex( + keys, + new Function() { @Override public String apply( - @Nullable Pair input + @Nullable NamedKey input ) { - return computeKey(input.lhs, input.rhs.array()); + return computeKeyString(input); } } ); @@ -165,7 +161,7 @@ public class MemcachedCacheBroker implements CacheBroker missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - Map, byte[]> results = Maps.newHashMap(); + Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { results.put( keyLookup.get(entry.getKey()), @@ -185,12 +181,12 @@ public class MemcachedCacheBroker implements CacheBroker } @Override - public void close(String identifier) + public void close(String namespace) { // no resources to cleanup } - private String computeKey(String identifier, byte[] key) { - return identifier + ":" + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES); + private static String computeKeyString(NamedKey key) { + return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java index 2041e2dca05..35ea7ac42d9 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java @@ -43,12 +43,12 @@ public class MapCacheBrokerTest @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get("a", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); Assert.assertEquals(0, baseMap.size()); put(cache, "a", HI, 1); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(2, baseMap.size()); @@ -58,26 +58,26 @@ public class MapCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(3, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); cache.close("a"); Assert.assertEquals(0, baseMap.size()); } - public void put(CacheBroker cache, String identifier, byte[] key, Integer value) + public void put(CacheBroker cache, String namespace, byte[] key, Integer value) { - cache.put(identifier, key, Ints.toByteArray(value)); + cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String identifier, byte[] key) + public int get(CacheBroker cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(identifier, key)); + return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index 14bd9fdc998..7c25f3ee954 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -4,7 +4,6 @@ import com.google.caliper.Param; import com.google.caliper.Runner; import com.google.caliper.SimpleBenchmark; import com.google.common.collect.Lists; -import com.metamx.common.Pair; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -13,7 +12,6 @@ import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.transcoders.SerializingTranscoder; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Random; @@ -22,7 +20,7 @@ import java.util.concurrent.TimeUnit; public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark { private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; - public static final String IDENTIFIER = "default"; + public static final String NAMESPACE = "default"; private MemcachedCacheBroker cache; private MemcachedClientIF client; @@ -77,7 +75,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for(int i = 0; i < reps; ++i) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - cache.put(IDENTIFIER, key.getBytes(), randBytes); + cache.put(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()), randBytes); } // make sure the write queue is empty client.waitForQueues(1, TimeUnit.HOURS); @@ -90,7 +88,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for (int i = 0; i < reps; i++) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - bytes = cache.get(IDENTIFIER, key.getBytes()); + bytes = cache.get(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); count += bytes.length; } } @@ -100,13 +98,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark public long timeBulkGetObjects(int reps) { long count = 0; for (int i = 0; i < reps; i++) { - List> keys = Lists.newArrayList(); + List keys = Lists.newArrayList(); for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - keys.add(Pair.of(IDENTIFIER, ByteBuffer.wrap(key.getBytes()))); + keys.add(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); } - Map, byte[]> results = cache.getBulk(keys); - for(Pair key : keys) { + Map results = cache.getBulk(keys); + for(CacheBroker.NamedKey key : keys) { byte[] bytes = results.get(key); count += bytes.length; } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 80ddd0072b5..31030260df7 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -68,10 +68,10 @@ public class MemcachedCacheBrokerTest @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get("a", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); put(cache, "a", HI, 1); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(1, get(cache, "a", HI)); @@ -79,13 +79,13 @@ public class MemcachedCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get("a", HO)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); cache.close("a"); } @@ -93,15 +93,15 @@ public class MemcachedCacheBrokerTest @Test public void testGetBulk() throws Exception { - Assert.assertNull(cache.get("the", HI)); + Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); put(cache, "the", HI, 2); put(cache, "the", HO, 10); - Pair key1 = Pair.of("the", ByteBuffer.wrap(HI)); - Pair key2 = Pair.of("the", ByteBuffer.wrap(HO)); + CacheBroker.NamedKey key1 = new CacheBroker.NamedKey("the", HI); + CacheBroker.NamedKey key2 = new CacheBroker.NamedKey("the", HO); - Map, byte[]> result = cache.getBulk( + Map result = cache.getBulk( Lists.newArrayList( key1, key2 @@ -112,14 +112,14 @@ public class MemcachedCacheBrokerTest Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); } - public void put(CacheBroker cache, String identifier, byte[] key, Integer value) + public void put(CacheBroker cache, String namespace, byte[] key, Integer value) { - cache.put(identifier, key, Ints.toByteArray(value)); + cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String identifier, byte[] key) + public int get(CacheBroker cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(identifier, key)); + return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); } } From 86ca8967cad017fb059ec618e25f45d967e41b77 Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 18 Jan 2013 15:04:12 -0800 Subject: [PATCH 7/9] rework code pulling from cache to be more readable --- .../druid/client/CachingClusteredClient.java | 77 +++++++++++-------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 625aef4d0e1..d69f91e6e14 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -20,8 +20,6 @@ package com.metamx.druid.client; import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -30,6 +28,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.ISE; import com.metamx.common.Pair; @@ -66,6 +65,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; /** @@ -134,9 +134,8 @@ public class CachingClusteredClient implements QueryRunner return Sequences.empty(); } - Map, CacheBroker.NamedKey> segments = Maps.newLinkedHashMap(); - - final byte[] queryCacheKey = (strategy != null) ? strategy.computeCacheKey(query) : null; + // build set of segments to query + Set> segments = Sets.newLinkedHashSet(); for (Interval interval : rewrittenQuery.getIntervals()) { List> serversLookup = timeline.lookup(interval); @@ -148,43 +147,44 @@ public class CachingClusteredClient implements QueryRunner holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() ); - segments.put( - Pair.of(selector, descriptor), - queryCacheKey == null ? null : - computeSegmentCacheKey(selector.getSegment().getIdentifier(), descriptor, queryCacheKey) - ); + segments.add(Pair.of(selector, descriptor)); } } } - Map cachedValues = cacheBroker.getBulk( - Iterables.filter(segments.values(), Predicates.notNull()) - ); + final byte[] queryCacheKey; + if(strategy != null) { + queryCacheKey = strategy.computeCacheKey(query); + } else { + queryCacheKey = null; + } - for(Map.Entry, CacheBroker.NamedKey> entry : segments.entrySet()) { - Pair segment = entry.getKey(); - CacheBroker.NamedKey segmentCacheKey = entry.getValue(); + // Pull cached segments from cache and remove from set of segments to query + if(useCache && queryCacheKey != null) { + Map, CacheBroker.NamedKey> cacheKeys = Maps.newHashMap(); + for(Pair e : segments) { + cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); + } - final ServerSelector selector = segment.lhs; - final SegmentDescriptor descriptor = segment.rhs; - final Interval segmentQueryInterval = descriptor.getInterval(); + Map cachedValues = cacheBroker.getBulk(cacheKeys.values()); - final byte[] cachedValue = segmentCacheKey == null ? null : cachedValues.get(segmentCacheKey); + for(Map.Entry, CacheBroker.NamedKey> entry : cacheKeys.entrySet()) { + Pair segment = entry.getKey(); + CacheBroker.NamedKey segmentCacheKey = entry.getValue(); - if (useCache && cachedValue != null) { - cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); - } else { - final DruidServer server = selector.pick(); - List descriptors = serverSegments.get(server); + final ServerSelector selector = segment.lhs; + final SegmentDescriptor descriptor = segment.rhs; + final Interval segmentQueryInterval = descriptor.getInterval(); - if (descriptors == null) { - descriptors = Lists.newArrayList(); - serverSegments.put(server, descriptors); + final byte[] cachedValue = cachedValues.get(segmentCacheKey); + + if (cachedValue != null) { + cachedResults.add(Pair.of(segmentQueryInterval.getStart(), cachedValue)); + + // remove cached segment from set of segments to query + segments.remove(segment); } - - descriptors.add(descriptor); - - if(segmentCacheKey != null) { + else { final String segmentIdentifier = selector.getSegment().getIdentifier(); cachePopulatorMap.put( String.format("%s_%s", segmentIdentifier, segmentQueryInterval), @@ -194,6 +194,19 @@ public class CachingClusteredClient implements QueryRunner } } + // Compile list of all segments not pulled from cache + for(Pair segment : segments) { + final DruidServer server = segment.lhs.pick(); + List descriptors = serverSegments.get(server); + + if (descriptors == null) { + descriptors = Lists.newArrayList(); + serverSegments.put(server, descriptors); + } + + descriptors.add(segment.rhs); + } + return new LazySequence( new Supplier>() From e0c34c3b972e94b4734216f2cbd28bafa1e02baf Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 18 Jan 2013 15:22:56 -0800 Subject: [PATCH 8/9] rename cacheBroker -> cache --- .../druid/client/CachingClusteredClient.java | 31 ++++++++------- .../cache/{CacheBroker.java => Cache.java} | 2 +- .../druid/client/cache/CacheMonitor.java | 8 ++-- .../{MapCacheBroker.java => MapCache.java} | 9 ++--- ...eBrokerConfig.java => MapCacheConfig.java} | 2 +- ...edCacheBroker.java => MemcachedCache.java} | 8 ++-- ...rConfig.java => MemcachedCacheConfig.java} | 2 +- .../com/metamx/druid/http/BrokerNode.java | 38 +++++++++---------- .../client/cache/MapCacheBrokerTest.java | 20 +++++----- .../cache/MemcachedCacheBrokerBenchmark.java | 16 ++++---- .../cache/MemcachedCacheBrokerTest.java | 30 +++++++-------- 11 files changed, 81 insertions(+), 85 deletions(-) rename client/src/main/java/com/metamx/druid/client/cache/{CacheBroker.java => Cache.java} (98%) rename client/src/main/java/com/metamx/druid/client/cache/{MapCacheBroker.java => MapCache.java} (96%) rename client/src/main/java/com/metamx/druid/client/cache/{MapCacheBrokerConfig.java => MapCacheConfig.java} (96%) rename client/src/main/java/com/metamx/druid/client/cache/{MemcachedCacheBroker.java => MemcachedCache.java} (95%) rename client/src/main/java/com/metamx/druid/client/cache/{MemcachedCacheBrokerConfig.java => MemcachedCacheConfig.java} (89%) diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index d69f91e6e14..5d640dc4e38 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -41,7 +41,7 @@ import com.metamx.druid.Query; import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.aggregation.AggregatorFactory; -import com.metamx.druid.client.cache.CacheBroker; +import com.metamx.druid.client.cache.Cache; import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.query.CacheStrategy; @@ -57,7 +57,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -76,19 +75,19 @@ public class CachingClusteredClient implements QueryRunner private final QueryToolChestWarehouse warehouse; private final ServerView serverView; - private final CacheBroker cacheBroker; + private final Cache cache; private final ObjectMapper objectMapper; public CachingClusteredClient( QueryToolChestWarehouse warehouse, ServerView serverView, - CacheBroker cacheBroker, + Cache cache, ObjectMapper objectMapper ) { this.warehouse = warehouse; this.serverView = serverView; - this.cacheBroker = cacheBroker; + this.cache = cache; this.objectMapper = objectMapper; serverView.registerSegmentCallback( @@ -100,7 +99,7 @@ public class CachingClusteredClient implements QueryRunner @Override public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) { - CachingClusteredClient.this.cacheBroker.close(segment.getIdentifier()); + CachingClusteredClient.this.cache.close(segment.getIdentifier()); return ServerView.CallbackAction.CONTINUE; } } @@ -161,16 +160,16 @@ public class CachingClusteredClient implements QueryRunner // Pull cached segments from cache and remove from set of segments to query if(useCache && queryCacheKey != null) { - Map, CacheBroker.NamedKey> cacheKeys = Maps.newHashMap(); + Map, Cache.NamedKey> cacheKeys = Maps.newHashMap(); for(Pair e : segments) { cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); } - Map cachedValues = cacheBroker.getBulk(cacheKeys.values()); + Map cachedValues = cache.getBulk(cacheKeys.values()); - for(Map.Entry, CacheBroker.NamedKey> entry : cacheKeys.entrySet()) { + for(Map.Entry, Cache.NamedKey> entry : cacheKeys.entrySet()) { Pair segment = entry.getKey(); - CacheBroker.NamedKey segmentCacheKey = entry.getValue(); + Cache.NamedKey segmentCacheKey = entry.getValue(); final ServerSelector selector = segment.lhs; final SegmentDescriptor descriptor = segment.rhs; @@ -188,7 +187,7 @@ public class CachingClusteredClient implements QueryRunner final String segmentIdentifier = selector.getSegment().getIdentifier(); cachePopulatorMap.put( String.format("%s_%s", segmentIdentifier, segmentQueryInterval), - new CachePopulator(cacheBroker, objectMapper, segmentCacheKey) + new CachePopulator(cache, objectMapper, segmentCacheKey) ); } } @@ -341,12 +340,12 @@ public class CachingClusteredClient implements QueryRunner ); } - private CacheBroker.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) + private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) { final Interval segmentQueryInterval = descriptor.getInterval(); final byte[] versionBytes = descriptor.getVersion().getBytes(); - return new CacheBroker.NamedKey( + return new Cache.NamedKey( segmentIdentifier, ByteBuffer .allocate(16 + versionBytes.length + 4 + queryCacheKey.length) .putLong(segmentQueryInterval.getStartMillis()) @@ -359,11 +358,11 @@ public class CachingClusteredClient implements QueryRunner private static class CachePopulator { - private final CacheBroker cache; + private final Cache cache; private final ObjectMapper mapper; - private final CacheBroker.NamedKey key; + private final Cache.NamedKey key; - public CachePopulator(CacheBroker cache, ObjectMapper mapper, CacheBroker.NamedKey key) + public CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key) { this.cache = cache; this.mapper = mapper; diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java similarity index 98% rename from client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java rename to client/src/main/java/com/metamx/druid/client/cache/Cache.java index adcba2e489c..9bf0cde33e7 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/Cache.java @@ -26,7 +26,7 @@ import java.util.Map; /** */ -public interface CacheBroker +public interface Cache { public byte[] get(NamedKey key); public void put(NamedKey key, byte[] value); diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java index d1337163ba6..b0c36629e89 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java @@ -27,21 +27,21 @@ import com.metamx.metrics.AbstractMonitor; */ public class CacheMonitor extends AbstractMonitor { - private final CacheBroker cacheBroker; + private final Cache cache; private volatile CacheStats prevCacheStats = null; public CacheMonitor( - CacheBroker cacheBroker + Cache cache ) { - this.cacheBroker = cacheBroker; + this.cache = cache; } @Override public boolean doMonitor(ServiceEmitter emitter) { - final CacheStats currCacheStats = cacheBroker.getStats(); + final CacheStats currCacheStats = cache.getStats(); final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java similarity index 96% rename from client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java rename to client/src/main/java/com/metamx/druid/client/cache/MapCache.java index d541ab40a66..2780acd0f9e 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java @@ -22,7 +22,6 @@ package com.metamx.druid.client.cache; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.metamx.common.ISE; -import com.metamx.common.Pair; import java.nio.ByteBuffer; import java.util.Collections; @@ -33,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; /** */ -public class MapCacheBroker implements CacheBroker +public class MapCache implements Cache { /** * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what @@ -57,9 +56,9 @@ public class MapCacheBroker implements CacheBroker private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); - public static CacheBroker create(final MapCacheBrokerConfig config) + public static com.metamx.druid.client.cache.Cache create(final MapCacheConfig config) { - return new MapCacheBroker( + return new MapCache( new ByteCountingLRUMap( config.getInitialSize(), config.getLogEvictionCount(), @@ -68,7 +67,7 @@ public class MapCacheBroker implements CacheBroker ); } - MapCacheBroker( + MapCache( ByteCountingLRUMap byteCountingLRUMap ) { diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBrokerConfig.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheConfig.java similarity index 96% rename from client/src/main/java/com/metamx/druid/client/cache/MapCacheBrokerConfig.java rename to client/src/main/java/com/metamx/druid/client/cache/MapCacheConfig.java index 6ff7a778236..2bc468bf899 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBrokerConfig.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheConfig.java @@ -24,7 +24,7 @@ import org.skife.config.Default; /** */ -public abstract class MapCacheBrokerConfig +public abstract class MapCacheConfig { @Config("${prefix}.sizeInBytes") @Default("0") diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java similarity index 95% rename from client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java rename to client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index fea391f7912..86cbf5153e5 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -41,16 +41,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -public class MemcachedCacheBroker implements CacheBroker +public class MemcachedCache implements Cache { - public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config) + public static MemcachedCache create(final MemcachedCacheConfig config) { try { SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); // disable compression transcoder.setCompressionThreshold(Integer.MAX_VALUE); - return new MemcachedCacheBroker( + return new MemcachedCache( new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) @@ -79,7 +79,7 @@ public class MemcachedCacheBroker implements CacheBroker private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0); - MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) { + MemcachedCache(MemcachedClientIF client, int timeout, int expiration) { this.timeout = timeout; this.expiration = expiration; this.client = client; diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java similarity index 89% rename from client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java rename to client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java index 5799d739bb6..83f626d8641 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java @@ -3,7 +3,7 @@ package com.metamx.druid.client.cache; import org.skife.config.Config; import org.skife.config.Default; -public abstract class MemcachedCacheBrokerConfig +public abstract class MemcachedCacheConfig { @Config("${prefix}.expiration") @Default("31536000") diff --git a/client/src/main/java/com/metamx/druid/http/BrokerNode.java b/client/src/main/java/com/metamx/druid/http/BrokerNode.java index aa14b440104..2a94a00d76d 100644 --- a/client/src/main/java/com/metamx/druid/http/BrokerNode.java +++ b/client/src/main/java/com/metamx/druid/http/BrokerNode.java @@ -34,13 +34,13 @@ import com.metamx.druid.client.BrokerServerView; import com.metamx.druid.client.CachingClusteredClient; import com.metamx.druid.client.ClientConfig; import com.metamx.druid.client.ClientInventoryManager; -import com.metamx.druid.client.cache.CacheBroker; +import com.metamx.druid.client.cache.Cache; import com.metamx.druid.client.cache.CacheConfig; import com.metamx.druid.client.cache.CacheMonitor; -import com.metamx.druid.client.cache.MapCacheBroker; -import com.metamx.druid.client.cache.MapCacheBrokerConfig; -import com.metamx.druid.client.cache.MemcachedCacheBroker; -import com.metamx.druid.client.cache.MemcachedCacheBrokerConfig; +import com.metamx.druid.client.cache.MapCache; +import com.metamx.druid.client.cache.MapCacheConfig; +import com.metamx.druid.client.cache.MemcachedCache; +import com.metamx.druid.client.cache.MemcachedCacheConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; @@ -78,7 +78,7 @@ public class BrokerNode extends QueryableNode private QueryToolChestWarehouse warehouse = null; private HttpClient brokerHttpClient = null; - private CacheBroker cacheBroker = null; + private Cache cache = null; private boolean useDiscovery = true; @@ -122,15 +122,15 @@ public class BrokerNode extends QueryableNode return this; } - public CacheBroker getCacheBroker() + public Cache getCache() { initializeCacheBroker(); - return cacheBroker; + return cache; } - public BrokerNode setCacheBroker(CacheBroker cacheBroker) + public BrokerNode setCache(Cache cache) { - checkFieldNotSetAndSet("cacheBroker", cacheBroker); + checkFieldNotSetAndSet("cache", cache); return this; } @@ -185,7 +185,7 @@ public class BrokerNode extends QueryableNode final Lifecycle lifecycle = getLifecycle(); final List monitors = getMonitors(); - monitors.add(new CacheMonitor(cacheBroker)); + monitors.add(new CacheMonitor(cache)); startMonitoring(monitors); final BrokerServerView view = new BrokerServerView(warehouse, getSmileMapper(), brokerHttpClient); @@ -194,7 +194,7 @@ public class BrokerNode extends QueryableNode ); lifecycle.addManagedInstance(clientInventoryManager); - final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cacheBroker, getSmileMapper()); + final CachingClusteredClient baseClient = new CachingClusteredClient(warehouse, view, cache, getSmileMapper()); lifecycle.addManagedInstance(baseClient); @@ -239,25 +239,25 @@ public class BrokerNode extends QueryableNode private void initializeCacheBroker() { - if (cacheBroker == null) { + if (cache == null) { String cacheType = getConfigFactory() .build(CacheConfig.class) .getType(); if (cacheType.equals(CACHE_TYPE_LOCAL)) { - setCacheBroker( - MapCacheBroker.create( + setCache( + MapCache.create( getConfigFactory().buildWithReplacements( - MapCacheBrokerConfig.class, + MapCacheConfig.class, ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX) ) ) ); } else if (cacheType.equals(CACHE_TYPE_MEMCACHED)) { - setCacheBroker( - MemcachedCacheBroker.create( + setCache( + MemcachedCache.create( getConfigFactory().buildWithReplacements( - MemcachedCacheBrokerConfig.class, + MemcachedCacheConfig.class, ImmutableMap.of("prefix", CACHE_PROPERTY_PREFIX) ) ) diff --git a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java index 35ea7ac42d9..78f071ca539 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java @@ -31,24 +31,24 @@ public class MapCacheBrokerTest private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); private ByteCountingLRUMap baseMap; - private MapCacheBroker cache; + private MapCache cache; @Before public void setUp() throws Exception { baseMap = new ByteCountingLRUMap(1024 * 1024); - cache = new MapCacheBroker(baseMap); + cache = new MapCache(baseMap); } @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HI))); Assert.assertEquals(0, baseMap.size()); put(cache, "a", HI, 1); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(2, baseMap.size()); @@ -58,26 +58,26 @@ public class MapCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(3, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, baseMap.size()); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); cache.close("a"); Assert.assertEquals(0, baseMap.size()); } - public void put(CacheBroker cache, String namespace, byte[] key, Integer value) + public void put(Cache cache, String namespace, byte[] key, Integer value) { - cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); + cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String namespace, byte[] key) + public int get(Cache cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); + return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key))); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index 7c25f3ee954..d87dfd5f7a1 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -22,7 +22,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; public static final String NAMESPACE = "default"; - private MemcachedCacheBroker cache; + private MemcachedCache cache; private MemcachedClientIF client; private static byte[] randBytes; @@ -54,7 +54,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark AddrUtil.getAddresses(hosts) ); - cache = new MemcachedCacheBroker( + cache = new MemcachedCache( client, 30000, // 30 seconds 3600 // 1 hour @@ -75,7 +75,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for(int i = 0; i < reps; ++i) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - cache.put(new CacheBroker.NamedKey(NAMESPACE, key.getBytes()), randBytes); + cache.put(new Cache.NamedKey(NAMESPACE, key.getBytes()), randBytes); } // make sure the write queue is empty client.waitForQueues(1, TimeUnit.HOURS); @@ -88,7 +88,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark for (int i = 0; i < reps; i++) { for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - bytes = cache.get(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); + bytes = cache.get(new Cache.NamedKey(NAMESPACE, key.getBytes())); count += bytes.length; } } @@ -98,13 +98,13 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark public long timeBulkGetObjects(int reps) { long count = 0; for (int i = 0; i < reps; i++) { - List keys = Lists.newArrayList(); + List keys = Lists.newArrayList(); for(int k = 0; k < objectCount; ++k) { String key = BASE_KEY + k; - keys.add(new CacheBroker.NamedKey(NAMESPACE, key.getBytes())); + keys.add(new Cache.NamedKey(NAMESPACE, key.getBytes())); } - Map results = cache.getBulk(keys); - for(CacheBroker.NamedKey key : keys) { + Map results = cache.getBulk(keys); + for(Cache.NamedKey key : keys) { byte[] bytes = results.get(key); count += bytes.length; } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 31030260df7..87c1dcdd9f4 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -22,7 +22,6 @@ package com.metamx.druid.client.cache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; -import com.metamx.common.Pair; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; @@ -38,7 +37,6 @@ import org.junit.Before; import org.junit.Test; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -56,22 +54,22 @@ public class MemcachedCacheBrokerTest { private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); - private MemcachedCacheBroker cache; + private MemcachedCache cache; @Before public void setUp() throws Exception { MemcachedClientIF client = new MockMemcachedClient(); - cache = new MemcachedCacheBroker(client, 500, 3600); + cache = new MemcachedCache(client, 500, 3600); } @Test public void testSanity() throws Exception { - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HI))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HI))); put(cache, "a", HI, 1); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); put(cache, "the", HI, 2); Assert.assertEquals(1, get(cache, "a", HI)); @@ -79,13 +77,13 @@ public class MemcachedCacheBrokerTest put(cache, "the", HO, 10); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); Assert.assertEquals(2, get(cache, "the", HI)); Assert.assertEquals(10, get(cache, "the", HO)); cache.close("the"); Assert.assertEquals(1, get(cache, "a", HI)); - Assert.assertNull(cache.get(new CacheBroker.NamedKey("a", HO))); + Assert.assertNull(cache.get(new Cache.NamedKey("a", HO))); cache.close("a"); } @@ -93,15 +91,15 @@ public class MemcachedCacheBrokerTest @Test public void testGetBulk() throws Exception { - Assert.assertNull(cache.get(new CacheBroker.NamedKey("the", HI))); + Assert.assertNull(cache.get(new Cache.NamedKey("the", HI))); put(cache, "the", HI, 2); put(cache, "the", HO, 10); - CacheBroker.NamedKey key1 = new CacheBroker.NamedKey("the", HI); - CacheBroker.NamedKey key2 = new CacheBroker.NamedKey("the", HO); + Cache.NamedKey key1 = new Cache.NamedKey("the", HI); + Cache.NamedKey key2 = new Cache.NamedKey("the", HO); - Map result = cache.getBulk( + Map result = cache.getBulk( Lists.newArrayList( key1, key2 @@ -112,14 +110,14 @@ public class MemcachedCacheBrokerTest Assert.assertEquals(10, Ints.fromByteArray(result.get(key2))); } - public void put(CacheBroker cache, String namespace, byte[] key, Integer value) + public void put(Cache cache, String namespace, byte[] key, Integer value) { - cache.put(new CacheBroker.NamedKey(namespace, key), Ints.toByteArray(value)); + cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value)); } - public int get(CacheBroker cache, String namespace, byte[] key) + public int get(Cache cache, String namespace, byte[] key) { - return Ints.fromByteArray(cache.get(new CacheBroker.NamedKey(namespace, key))); + return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key))); } } From 40c0bcad29f5152cff7a880ca124b65aaca41643 Mon Sep 17 00:00:00 2001 From: xvrl Date: Fri, 18 Jan 2013 18:25:51 -0800 Subject: [PATCH 9/9] simplify MapCache --- .../metamx/druid/client/cache/MapCache.java | 147 ++++++------------ 1 file changed, 51 insertions(+), 96 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java index 2780acd0f9e..53e1e20280a 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java @@ -21,7 +21,6 @@ package com.metamx.druid.client.cache; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; -import com.metamx.common.ISE; import java.nio.ByteBuffer; import java.util.Collections; @@ -34,21 +33,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MapCache implements Cache { - /** - * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what - * is actually going to be done. - */ - public interface Cache - { - public byte[] get(byte[] key); - public void put(byte[] key, byte[] value); - public void close(); - } - private final Map baseMap; private final ByteCountingLRUMap byteCountingLRUMap; - private final Map cacheCache; + private final Map namespaceId; private final AtomicInteger ids; private final Object clearLock = new Object(); @@ -75,7 +63,7 @@ public class MapCache implements Cache this.baseMap = Collections.synchronizedMap(byteCountingLRUMap); - cacheCache = Maps.newHashMap(); + namespaceId = Maps.newHashMap(); ids = new AtomicInteger(); } @@ -95,22 +83,29 @@ public class MapCache implements Cache @Override public byte[] get(NamedKey key) { - return provideCache(key.namespace).get(key.key); + final byte[] retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key)); + if (retVal == null) { + missCount.incrementAndGet(); + } else { + hitCount.incrementAndGet(); + } + return retVal; } @Override public void put(NamedKey key, byte[] value) { - provideCache(key.namespace).put(key.key, value); + synchronized (clearLock) { + baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value); + } } @Override public Map getBulk(Iterable keys) { Map retVal = Maps.newHashMap(); - for(NamedKey key : keys) { - retVal.put(key, provideCache(key.namespace).get(key.key)); + retVal.put(key, get(key)); } return retVal; } @@ -118,86 +113,46 @@ public class MapCache implements Cache @Override public void close(String namespace) { - provideCache(namespace).close(); - } + byte[] idBytes; + synchronized (namespaceId) { + idBytes = getNamespaceId(namespace); + if(idBytes == null) return; - private Cache provideCache(final String identifier) - { - synchronized (cacheCache) { - final Cache cachedCache = cacheCache.get(identifier); - if (cachedCache != null) { - return cachedCache; + namespaceId.remove(namespace); + } + synchronized (clearLock) { + Iterator iter = baseMap.keySet().iterator(); + while (iter.hasNext()) { + ByteBuffer next = iter.next(); + + if (next.get(0) == idBytes[0] + && next.get(1) == idBytes[1] + && next.get(2) == idBytes[2] + && next.get(3) == idBytes[3]) { + iter.remove(); + } } - - final byte[] myIdBytes = Ints.toByteArray(ids.getAndIncrement()); - - final Cache theCache = new Cache() - { - volatile boolean open = true; - - @Override - public byte[] get(byte[] key) - { - if (open) { - final byte[] retVal = baseMap.get(computeKey(key)); - if (retVal == null) { - missCount.incrementAndGet(); - } else { - hitCount.incrementAndGet(); - } - return retVal; - } - throw new ISE("Cache for namespace[%s] is closed.", identifier); - } - - @Override - public void put(byte[] key, byte[] value) - { - synchronized (clearLock) { - if (open) { - baseMap.put(computeKey(key), value); - return; - } - } - throw new ISE("Cache for namespace[%s] is closed.", identifier); - } - - @Override - public void close() - { - synchronized (cacheCache) { - cacheCache.remove(identifier); - } - synchronized (clearLock) { - if (open) { - open = false; - - Iterator iter = baseMap.keySet().iterator(); - while (iter.hasNext()) { - ByteBuffer next = iter.next(); - - if (next.get(0) == myIdBytes[0] - && next.get(1) == myIdBytes[1] - && next.get(2) == myIdBytes[2] - && next.get(3) == myIdBytes[3]) { - iter.remove(); - } - } - } - } - } - - private ByteBuffer computeKey(byte[] key) - { - final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(myIdBytes).put(key); - retVal.rewind(); - return retVal; - } - }; - - cacheCache.put(identifier, theCache); - - return theCache; } } + + private byte[] getNamespaceId(final String identifier) + { + synchronized (namespaceId) { + byte[] idBytes = namespaceId.get(identifier); + if (idBytes != null) { + return idBytes; + } + + idBytes = Ints.toByteArray(ids.getAndIncrement()); + namespaceId.put(identifier, idBytes); + return idBytes; + } + } + + private ByteBuffer computeKey(byte[] idBytes, byte[] key) + { + final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key); + retVal.rewind(); + return retVal; + } }