implement memcached CacheBroker

This commit is contained in:
xvrl 2012-12-03 16:23:11 -08:00
parent 5e8bd15df0
commit a03f8527ea
7 changed files with 750 additions and 6 deletions

View File

@ -170,6 +170,11 @@
<groupId>com.github.sgroschupf</groupId> <groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId> <artifactId>zkclient</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.simple-spring-memcached</groupId>
<artifactId>spymemcached</artifactId>
<version>2.8.4</version>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -26,6 +26,6 @@ package com.metamx.druid.client.cache;
public interface Cache public interface Cache
{ {
public byte[] get(byte[] key); public byte[] get(byte[] key);
public byte[] put(byte[] key, byte[] value); public void put(byte[] key, byte[] value);
public void close(); public void close();
} }

View File

@ -28,13 +28,15 @@ public class CacheStats
private final long size; private final long size;
private final long sizeInBytes; private final long sizeInBytes;
private final long numEvictions; private final long numEvictions;
private final long numTimeouts;
public CacheStats( public CacheStats(
long numHits, long numHits,
long numMisses, long numMisses,
long size, long size,
long sizeInBytes, long sizeInBytes,
long numEvictions long numEvictions,
long numTimeouts
) )
{ {
this.numHits = numHits; this.numHits = numHits;
@ -42,6 +44,7 @@ public class CacheStats
this.size = size; this.size = size;
this.sizeInBytes = sizeInBytes; this.sizeInBytes = sizeInBytes;
this.numEvictions = numEvictions; this.numEvictions = numEvictions;
this.numTimeouts = numTimeouts;
} }
public long getNumHits() public long getNumHits()
@ -69,6 +72,11 @@ public class CacheStats
return numEvictions; return numEvictions;
} }
public long getNumTimeouts()
{
return numTimeouts;
}
public long numLookups() public long numLookups()
{ {
return numHits + numMisses; return numHits + numMisses;
@ -95,7 +103,8 @@ public class CacheStats
numMisses - oldStats.numMisses, numMisses - oldStats.numMisses,
size - oldStats.size, size - oldStats.size,
sizeInBytes - oldStats.sizeInBytes, sizeInBytes - oldStats.sizeInBytes,
numEvictions - oldStats.numEvictions numEvictions - oldStats.numEvictions,
numTimeouts - oldStats.numTimeouts
); );
} }
} }

View File

@ -77,7 +77,8 @@ public class MapCacheBroker implements CacheBroker
missCount.get(), missCount.get(),
byteCountingLRUMap.size(), byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(), byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount() byteCountingLRUMap.getEvictionCount(),
0
); );
} }
@ -112,11 +113,12 @@ public class MapCacheBroker implements CacheBroker
} }
@Override @Override
public byte[] put(byte[] key, byte[] value) public void put(byte[] key, byte[] value)
{ {
synchronized (clearLock) { synchronized (clearLock) {
if (open) { if (open) {
return baseMap.put(computeKey(key), value); baseMap.put(computeKey(key), value);
return;
} }
} }
throw new ISE("Cache for identifier[%s] is closed.", identifier); throw new ISE("Cache for identifier[%s] is closed.", identifier);

View File

@ -0,0 +1,111 @@
package com.metamx.druid.client.cache;
import net.iharder.base64.Base64;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class MemcachedCacheBroker implements CacheBroker
{
public static CacheBroker create(final MemcachedCacheBrokerConfig config)
{
try {
return new MemcachedCacheBroker(
new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build(),
AddrUtil.getAddresses(config.getHosts())
),
config.getTimeout(),
config.getExpiration()
);
} catch(IOException e) {
throw new RuntimeException(e);
}
}
private final int timeout;
private final int expiration;
private final MemcachedClientIF client;
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
this.timeout = timeout;
this.expiration = expiration;
this.client = client;
}
@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
0,
0,
0,
timeoutCount.get()
);
}
@Override
public Cache provideCache(final String identifier)
{
return new Cache()
{
@Override
public byte[] get(byte[] key)
{
Future<Object> future = client.asyncGet(computeKey(identifier, key));
try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) {
hitCount.incrementAndGet();
}
else {
missCount.incrementAndGet();
}
return bytes;
}
catch(TimeoutException e) {
timeoutCount.incrementAndGet();
future.cancel(false);
return null;
}
catch(InterruptedException e) {
return null;
}
catch(ExecutionException e) {
return null;
}
}
@Override
public void put(byte[] key, byte[] value)
{
client.set(computeKey(identifier, key), expiration, value);
}
@Override
public void close()
{
// no resources to cleanup
}
};
}
private String computeKey(String identifier, byte[] key) {
return identifier + Base64.encodeBytes(key);
}
}

View File

@ -0,0 +1,18 @@
package com.metamx.druid.client.cache;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class MemcachedCacheBrokerConfig
{
@Config("${prefix}.initialSize")
@Default("31536000")
public abstract int getExpiration();
@Config("${prefix}.timeout")
@Default("500")
public abstract int getTimeout();
@Config("${prefix}.hosts")
public abstract String getHosts();
}

View File

@ -0,0 +1,599 @@
package com.metamx.druid.client.cache;
import com.google.common.primitives.Ints;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*/
public class MemcachedBrokerTest
{
private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes();
private MemcachedCacheBroker broker;
@Before
public void setUp() throws Exception
{
MemcachedClientIF client = new MockMemcachedClient();
broker = new MemcachedCacheBroker(client, 500, 3600);
}
@Test
public void testSanity() throws Exception
{
Cache aCache = broker.provideCache("a");
Cache theCache = broker.provideCache("the");
Assert.assertNull(aCache.get(HI));
put(aCache, HI, 1);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(theCache.get(HI));
put(theCache, HI, 2);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertEquals(2, get(theCache, HI));
put(theCache, HO, 10);
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
Assert.assertEquals(2, get(theCache, HI));
Assert.assertEquals(10, get(theCache, HO));
theCache.close();
Assert.assertEquals(1, get(aCache, HI));
Assert.assertNull(aCache.get(HO));
aCache.close();
}
public void put(Cache cache, byte[] key, Integer value)
{
cache.put(key, Ints.toByteArray(value));
}
public int get(Cache cache, byte[] key)
{
return Ints.fromByteArray(cache.get(key));
}
}
class MockMemcachedClient implements MemcachedClientIF
{
private final ConcurrentMap<String, CachedData> theMap = new ConcurrentHashMap<String, CachedData>();
private final Transcoder<Object> transcoder = new SerializingTranscoder();
@Override
public Collection<SocketAddress> getAvailableServers()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Collection<SocketAddress> getUnavailableServers()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Transcoder<Object> getTranscoder()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public NodeLocator getNodeLocator()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> append(long cas, String key, Object val)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> append(long cas, String key, T val, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> prepend(long cas, String key, Object val)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> prepend(long cas, String key, T val, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<CASResponse> asyncCAS(String key, long casId, T value, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<CASResponse> asyncCAS(String key, long casId, Object value)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> CASResponse cas(String key, long casId, int exp, T value, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public CASResponse cas(String key, long casId, Object value)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> add(String key, int exp, T o, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> add(String key, int exp, Object o)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> set(String key, int exp, T o, Transcoder<T> tc)
{
theMap.put(key, tc.encode(o));
return new Future<Boolean>()
{
@Override
public boolean cancel(boolean b)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public Boolean get() throws InterruptedException, ExecutionException
{
return true;
}
@Override
public Boolean get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
{
return true;
}
};
}
@Override
public Future<Boolean> set(String key, int exp, Object o)
{
return set(key, exp, o, transcoder);
}
@Override
public <T> Future<Boolean> replace(String key, int exp, T o, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> replace(String key, int exp, Object o)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<T> asyncGet(String key, final Transcoder<T> tc)
{
CachedData data = theMap.get(key);
final T theValue = data != null ? tc.decode(data) : null;
return new Future<T>()
{
@Override
public boolean cancel(boolean b)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return true;
}
@Override
public T get() throws InterruptedException, ExecutionException
{
return theValue;
}
@Override
public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
{
return theValue;
}
};
}
@Override
public Future<Object> asyncGet(String key)
{
return asyncGet(key, transcoder);
}
@Override
public Future<CASValue<Object>> asyncGetAndTouch(String key, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<CASValue<T>> asyncGetAndTouch(String key, int exp, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public CASValue<Object> getAndTouch(String key, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> CASValue<T> getAndTouch(String key, int exp, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<CASValue<T>> asyncGets(String key, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<CASValue<Object>> asyncGets(String key)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> CASValue<T> gets(String key, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public CASValue<Object> gets(String key)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> T get(String key, Transcoder<T> tc)
{
CachedData data = theMap.get(key);
return data != null ? tc.decode(data) : null;
}
@Override
public Object get(String key)
{
return get(key, transcoder);
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Iterator<Transcoder<T>> tcs)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Iterator<Transcoder<T>> tcs)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public BulkFuture<Map<String, Object>> asyncGetBulk(Iterator<String> keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Transcoder<T> tc, String... keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Map<String, T> getBulk(Iterator<String> keys, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Map<String, T> getBulk(Collection<String> keys, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<String, Object> getBulk(Iterator<String> keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<String, Object> getBulk(Collection<String> keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Map<String, T> getBulk(Transcoder<T> tc, String... keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<String, Object> getBulk(String... keys)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> touch(String key, int exp, Transcoder<T> tc)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public <T> Future<Boolean> touch(String key, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<SocketAddress, String> getVersions()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<SocketAddress, Map<String, String>> getStats()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Map<SocketAddress, Map<String, String>> getStats(String prefix)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, long by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, int by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, long by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, int by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, long by, long def, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, int by, long def, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, long by, long def, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, int by, long def, int exp)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Long> asyncIncr(String key, long by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Long> asyncIncr(String key, int by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Long> asyncDecr(String key, long by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Long> asyncDecr(String key, int by)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, long by, long def)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long incr(String key, int by, long def)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, long by, long def)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public long decr(String key, int by, long def)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> delete(String key)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> flush(int delay)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Future<Boolean> flush()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public void shutdown()
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public boolean shutdown(long timeout, TimeUnit unit)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public boolean waitForQueues(long timeout, TimeUnit unit)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public boolean addObserver(ConnectionObserver obs)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public boolean removeObserver(ConnectionObserver obs)
{
throw new UnsupportedOperationException("not implemented");
}
@Override
public Set<String> listSaslMechanisms()
{
throw new UnsupportedOperationException("not implemented");
}
};