From a03f8527ea27d5a0862eedf901f8fb50f4125824 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 3 Dec 2012 16:23:11 -0800 Subject: [PATCH] implement memcached CacheBroker --- client/pom.xml | 5 + .../com/metamx/druid/client/cache/Cache.java | 2 +- .../metamx/druid/client/cache/CacheStats.java | 13 +- .../druid/client/cache/MapCacheBroker.java | 8 +- .../client/cache/MemcachedCacheBroker.java | 111 ++++ .../cache/MemcachedCacheBrokerConfig.java | 18 + .../client/cache/MemcachedBrokerTest.java | 599 ++++++++++++++++++ 7 files changed, 750 insertions(+), 6 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java create mode 100644 client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java create mode 100644 client/src/test/java/com/metamx/druid/client/cache/MemcachedBrokerTest.java diff --git a/client/pom.xml b/client/pom.xml index 5db815ff52e..eb5f801b9d3 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -170,6 +170,11 @@ com.github.sgroschupf zkclient + + com.google.code.simple-spring-memcached + spymemcached + 2.8.4 + diff --git a/client/src/main/java/com/metamx/druid/client/cache/Cache.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java index a9e9ebdb12c..e7907c9548f 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/Cache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/Cache.java @@ -26,6 +26,6 @@ package com.metamx.druid.client.cache; public interface Cache { public byte[] get(byte[] key); - public byte[] put(byte[] key, byte[] value); + public void put(byte[] key, byte[] value); public void close(); } diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java index 822a78fbbc1..1a9950c8698 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java @@ -28,13 +28,15 @@ public class CacheStats private final long size; private final long sizeInBytes; private final long numEvictions; + private final long numTimeouts; public CacheStats( long numHits, long numMisses, long size, long sizeInBytes, - long numEvictions + long numEvictions, + long numTimeouts ) { this.numHits = numHits; @@ -42,6 +44,7 @@ public class CacheStats this.size = size; this.sizeInBytes = sizeInBytes; this.numEvictions = numEvictions; + this.numTimeouts = numTimeouts; } public long getNumHits() @@ -69,6 +72,11 @@ public class CacheStats return numEvictions; } + public long getNumTimeouts() + { + return numTimeouts; + } + public long numLookups() { return numHits + numMisses; @@ -95,7 +103,8 @@ public class CacheStats numMisses - oldStats.numMisses, size - oldStats.size, sizeInBytes - oldStats.sizeInBytes, - numEvictions - oldStats.numEvictions + numEvictions - oldStats.numEvictions, + numTimeouts - oldStats.numTimeouts ); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java index 943c5aae154..d8ec202021a 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java @@ -77,7 +77,8 @@ public class MapCacheBroker implements CacheBroker missCount.get(), byteCountingLRUMap.size(), byteCountingLRUMap.getNumBytes(), - byteCountingLRUMap.getEvictionCount() + byteCountingLRUMap.getEvictionCount(), + 0 ); } @@ -112,11 +113,12 @@ public class MapCacheBroker implements CacheBroker } @Override - public byte[] put(byte[] key, byte[] value) + public void put(byte[] key, byte[] value) { synchronized (clearLock) { if (open) { - return baseMap.put(computeKey(key), value); + baseMap.put(computeKey(key), value); + return; } } throw new ISE("Cache for identifier[%s] is closed.", identifier); diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java new file mode 100644 index 00000000000..ffa5faca5cc --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java @@ -0,0 +1,111 @@ +package com.metamx.druid.client.cache; + +import net.iharder.base64.Base64; +import net.spy.memcached.AddrUtil; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.MemcachedClientIF; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public class MemcachedCacheBroker implements CacheBroker +{ + public static CacheBroker create(final MemcachedCacheBrokerConfig config) + { + try { + return new MemcachedCacheBroker( + new MemcachedClient( + new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build(), + AddrUtil.getAddresses(config.getHosts()) + ), + config.getTimeout(), + config.getExpiration() + ); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + private final int timeout; + private final int expiration; + + private final MemcachedClientIF client; + + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong timeoutCount = new AtomicLong(0); + + MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) { + this.timeout = timeout; + this.expiration = expiration; + this.client = client; + } + + @Override + public CacheStats getStats() + { + return new CacheStats( + hitCount.get(), + missCount.get(), + 0, + 0, + 0, + timeoutCount.get() + ); + } + + @Override + public Cache provideCache(final String identifier) + { + return new Cache() + { + @Override + public byte[] get(byte[] key) + { + Future future = client.asyncGet(computeKey(identifier, key)); + try { + byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); + if(bytes != null) { + hitCount.incrementAndGet(); + } + else { + missCount.incrementAndGet(); + } + return bytes; + } + catch(TimeoutException e) { + timeoutCount.incrementAndGet(); + future.cancel(false); + return null; + } + catch(InterruptedException e) { + return null; + } + catch(ExecutionException e) { + return null; + } + } + + @Override + public void put(byte[] key, byte[] value) + { + client.set(computeKey(identifier, key), expiration, value); + } + + @Override + public void close() + { + // no resources to cleanup + } + }; + } + + private String computeKey(String identifier, byte[] key) { + return identifier + Base64.encodeBytes(key); + } +} diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java new file mode 100644 index 00000000000..530f88e5c1f --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java @@ -0,0 +1,18 @@ +package com.metamx.druid.client.cache; + +import org.skife.config.Config; +import org.skife.config.Default; + +public abstract class MemcachedCacheBrokerConfig +{ + @Config("${prefix}.initialSize") + @Default("31536000") + public abstract int getExpiration(); + + @Config("${prefix}.timeout") + @Default("500") + public abstract int getTimeout(); + + @Config("${prefix}.hosts") + public abstract String getHosts(); +} diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedBrokerTest.java new file mode 100644 index 00000000000..fbb1e553448 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedBrokerTest.java @@ -0,0 +1,599 @@ +package com.metamx.druid.client.cache; + +import com.google.common.primitives.Ints; +import net.spy.memcached.CASResponse; +import net.spy.memcached.CASValue; +import net.spy.memcached.CachedData; +import net.spy.memcached.ConnectionObserver; +import net.spy.memcached.MemcachedClientIF; +import net.spy.memcached.NodeLocator; +import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.transcoders.SerializingTranscoder; +import net.spy.memcached.transcoders.Transcoder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.net.SocketAddress; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + */ +public class MemcachedBrokerTest +{ + private static final byte[] HI = "hi".getBytes(); + private static final byte[] HO = "ho".getBytes(); + private MemcachedCacheBroker broker; + + @Before + public void setUp() throws Exception + { + MemcachedClientIF client = new MockMemcachedClient(); + broker = new MemcachedCacheBroker(client, 500, 3600); + } + + @Test + public void testSanity() throws Exception + { + Cache aCache = broker.provideCache("a"); + Cache theCache = broker.provideCache("the"); + + Assert.assertNull(aCache.get(HI)); + put(aCache, HI, 1); + Assert.assertEquals(1, get(aCache, HI)); + Assert.assertNull(theCache.get(HI)); + + put(theCache, HI, 2); + Assert.assertEquals(1, get(aCache, HI)); + Assert.assertEquals(2, get(theCache, HI)); + + put(theCache, HO, 10); + Assert.assertEquals(1, get(aCache, HI)); + Assert.assertNull(aCache.get(HO)); + Assert.assertEquals(2, get(theCache, HI)); + Assert.assertEquals(10, get(theCache, HO)); + + theCache.close(); + Assert.assertEquals(1, get(aCache, HI)); + Assert.assertNull(aCache.get(HO)); + + aCache.close(); + } + + public void put(Cache cache, byte[] key, Integer value) + { + cache.put(key, Ints.toByteArray(value)); + } + + public int get(Cache cache, byte[] key) + { + return Ints.fromByteArray(cache.get(key)); + } +} + +class MockMemcachedClient implements MemcachedClientIF +{ + private final ConcurrentMap theMap = new ConcurrentHashMap(); + private final Transcoder transcoder = new SerializingTranscoder(); + + @Override + public Collection getAvailableServers() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Collection getUnavailableServers() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Transcoder getTranscoder() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public NodeLocator getNodeLocator() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future append(long cas, String key, Object val) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future append(long cas, String key, T val, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future prepend(long cas, String key, Object val) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future prepend(long cas, String key, T val, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncCAS(String key, long casId, T value, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncCAS(String key, long casId, Object value) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASResponse cas(String key, long casId, int exp, T value, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASResponse cas(String key, long casId, Object value) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future add(String key, int exp, T o, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future add(String key, int exp, Object o) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future set(String key, int exp, T o, Transcoder tc) + { + theMap.put(key, tc.encode(o)); + + return new Future() + { + @Override + public boolean cancel(boolean b) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public Boolean get() throws InterruptedException, ExecutionException + { + return true; + } + + @Override + public Boolean get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException + { + return true; + } + }; + } + + @Override + public Future set(String key, int exp, Object o) + { + return set(key, exp, o, transcoder); + } + + @Override + public Future replace(String key, int exp, T o, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future replace(String key, int exp, Object o) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncGet(String key, final Transcoder tc) + { + CachedData data = theMap.get(key); + final T theValue = data != null ? tc.decode(data) : null; + + return new Future() + { + @Override + public boolean cancel(boolean b) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public T get() throws InterruptedException, ExecutionException + { + return theValue; + } + + @Override + public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException + { + return theValue; + } + }; + } + + @Override + public Future asyncGet(String key) + { + return asyncGet(key, transcoder); + } + + @Override + public Future> asyncGetAndTouch(String key, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future> asyncGetAndTouch(String key, int exp, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASValue getAndTouch(String key, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASValue getAndTouch(String key, int exp, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future> asyncGets(String key, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future> asyncGets(String key) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASValue gets(String key, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public CASValue gets(String key) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public T get(String key, Transcoder tc) + { + CachedData data = theMap.get(key); + return data != null ? tc.decode(data) : null; + } + + @Override + public Object get(String key) + { + return get(key, transcoder); + } + + @Override + public BulkFuture> asyncGetBulk(Iterator keys, Iterator> tcs) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Collection keys, Iterator> tcs) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Iterator keys, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Collection keys, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Iterator keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Collection keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(Transcoder tc, String... keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public BulkFuture> asyncGetBulk(String... keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(Iterator keys, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(Collection keys, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(Iterator keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(Collection keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(Transcoder tc, String... keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getBulk(String... keys) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future touch(String key, int exp, Transcoder tc) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future touch(String key, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getVersions() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map> getStats() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map> getStats(String prefix) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, long by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, int by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, long by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, int by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, long by, long def, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, int by, long def, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, long by, long def, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, int by, long def, int exp) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncIncr(String key, long by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncIncr(String key, int by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncDecr(String key, long by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future asyncDecr(String key, int by) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, long by, long def) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long incr(String key, int by, long def) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, long by, long def) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long decr(String key, int by, long def) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future delete(String key) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future flush(int delay) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Future flush() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void shutdown() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public boolean shutdown(long timeout, TimeUnit unit) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public boolean waitForQueues(long timeout, TimeUnit unit) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public boolean addObserver(ConnectionObserver obs) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public boolean removeObserver(ConnectionObserver obs) + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Set listSaslMechanisms() + { + throw new UnsupportedOperationException("not implemented"); + } +};