mirror of https://github.com/apache/druid.git
Merge pull request #38 from metamx/memcached-cachebroker
memcached version of CacheBroker
This commit is contained in:
commit
6e775a9ecd
|
@ -170,6 +170,10 @@
|
||||||
<groupId>com.github.sgroschupf</groupId>
|
<groupId>com.github.sgroschupf</groupId>
|
||||||
<artifactId>zkclient</artifactId>
|
<artifactId>zkclient</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.simple-spring-memcached</groupId>
|
||||||
|
<artifactId>spymemcached</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Tests -->
|
<!-- Tests -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -182,6 +186,11 @@
|
||||||
<artifactId>easymock</artifactId>
|
<artifactId>easymock</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.caliper</groupId>
|
||||||
|
<artifactId>caliper</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -26,6 +26,6 @@ package com.metamx.druid.client.cache;
|
||||||
public interface Cache
|
public interface Cache
|
||||||
{
|
{
|
||||||
public byte[] get(byte[] key);
|
public byte[] get(byte[] key);
|
||||||
public byte[] put(byte[] key, byte[] value);
|
public void put(byte[] key, byte[] value);
|
||||||
public void close();
|
public void close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,5 +66,6 @@ public class CacheMonitor extends AbstractMonitor
|
||||||
emitter.emit(builder.build(String.format("%s/evictions", metricPrefix), cacheStats.getNumEvictions()));
|
emitter.emit(builder.build(String.format("%s/evictions", metricPrefix), cacheStats.getNumEvictions()));
|
||||||
emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate()));
|
emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate()));
|
||||||
emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
|
emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
|
||||||
|
emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,13 +28,15 @@ public class CacheStats
|
||||||
private final long size;
|
private final long size;
|
||||||
private final long sizeInBytes;
|
private final long sizeInBytes;
|
||||||
private final long numEvictions;
|
private final long numEvictions;
|
||||||
|
private final long numTimeouts;
|
||||||
|
|
||||||
public CacheStats(
|
public CacheStats(
|
||||||
long numHits,
|
long numHits,
|
||||||
long numMisses,
|
long numMisses,
|
||||||
long size,
|
long size,
|
||||||
long sizeInBytes,
|
long sizeInBytes,
|
||||||
long numEvictions
|
long numEvictions,
|
||||||
|
long numTimeouts
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.numHits = numHits;
|
this.numHits = numHits;
|
||||||
|
@ -42,6 +44,7 @@ public class CacheStats
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.sizeInBytes = sizeInBytes;
|
this.sizeInBytes = sizeInBytes;
|
||||||
this.numEvictions = numEvictions;
|
this.numEvictions = numEvictions;
|
||||||
|
this.numTimeouts = numTimeouts;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNumHits()
|
public long getNumHits()
|
||||||
|
@ -69,6 +72,11 @@ public class CacheStats
|
||||||
return numEvictions;
|
return numEvictions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getNumTimeouts()
|
||||||
|
{
|
||||||
|
return numTimeouts;
|
||||||
|
}
|
||||||
|
|
||||||
public long numLookups()
|
public long numLookups()
|
||||||
{
|
{
|
||||||
return numHits + numMisses;
|
return numHits + numMisses;
|
||||||
|
@ -95,7 +103,8 @@ public class CacheStats
|
||||||
numMisses - oldStats.numMisses,
|
numMisses - oldStats.numMisses,
|
||||||
size - oldStats.size,
|
size - oldStats.size,
|
||||||
sizeInBytes - oldStats.sizeInBytes,
|
sizeInBytes - oldStats.sizeInBytes,
|
||||||
numEvictions - oldStats.numEvictions
|
numEvictions - oldStats.numEvictions,
|
||||||
|
numTimeouts - oldStats.numTimeouts
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,8 @@ public class MapCacheBroker implements CacheBroker
|
||||||
missCount.get(),
|
missCount.get(),
|
||||||
byteCountingLRUMap.size(),
|
byteCountingLRUMap.size(),
|
||||||
byteCountingLRUMap.getNumBytes(),
|
byteCountingLRUMap.getNumBytes(),
|
||||||
byteCountingLRUMap.getEvictionCount()
|
byteCountingLRUMap.getEvictionCount(),
|
||||||
|
0
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,11 +113,12 @@ public class MapCacheBroker implements CacheBroker
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] put(byte[] key, byte[] value)
|
public void put(byte[] key, byte[] value)
|
||||||
{
|
{
|
||||||
synchronized (clearLock) {
|
synchronized (clearLock) {
|
||||||
if (open) {
|
if (open) {
|
||||||
return baseMap.put(computeKey(key), value);
|
baseMap.put(computeKey(key), value);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new ISE("Cache for identifier[%s] is closed.", identifier);
|
throw new ISE("Cache for identifier[%s] is closed.", identifier);
|
||||||
|
|
145
client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
vendored
Normal file
145
client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
vendored
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import net.iharder.base64.Base64;
|
||||||
|
import net.spy.memcached.AddrUtil;
|
||||||
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
|
import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
|
import net.spy.memcached.FailureMode;
|
||||||
|
import net.spy.memcached.MemcachedClient;
|
||||||
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class MemcachedCacheBroker implements CacheBroker
|
||||||
|
{
|
||||||
|
public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
|
||||||
|
// disable compression
|
||||||
|
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
return new MemcachedCacheBroker(
|
||||||
|
new MemcachedClient(
|
||||||
|
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
|
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
||||||
|
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||||
|
.setDaemon(true)
|
||||||
|
.setFailureMode(FailureMode.Retry)
|
||||||
|
.setTranscoder(transcoder)
|
||||||
|
.setShouldOptimize(true)
|
||||||
|
.build(),
|
||||||
|
AddrUtil.getAddresses(config.getHosts())
|
||||||
|
),
|
||||||
|
config.getTimeout(),
|
||||||
|
config.getExpiration()
|
||||||
|
);
|
||||||
|
} catch(IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final int timeout;
|
||||||
|
private final int expiration;
|
||||||
|
|
||||||
|
private final MemcachedClientIF client;
|
||||||
|
|
||||||
|
private final AtomicLong hitCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong missCount = new AtomicLong(0);
|
||||||
|
private final AtomicLong timeoutCount = new AtomicLong(0);
|
||||||
|
|
||||||
|
MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
|
||||||
|
this.timeout = timeout;
|
||||||
|
this.expiration = expiration;
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheStats getStats()
|
||||||
|
{
|
||||||
|
return new CacheStats(
|
||||||
|
hitCount.get(),
|
||||||
|
missCount.get(),
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
timeoutCount.get()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cache provideCache(final String identifier)
|
||||||
|
{
|
||||||
|
return new Cache()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public byte[] get(byte[] key)
|
||||||
|
{
|
||||||
|
Future<Object> future = client.asyncGet(computeKey(identifier, key));
|
||||||
|
try {
|
||||||
|
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
if(bytes != null) {
|
||||||
|
hitCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
missCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
catch(TimeoutException e) {
|
||||||
|
timeoutCount.incrementAndGet();
|
||||||
|
future.cancel(false);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
catch(InterruptedException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
catch(ExecutionException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(byte[] key, byte[] value)
|
||||||
|
{
|
||||||
|
client.set(computeKey(identifier, key), expiration, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
// no resources to cleanup
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private String computeKey(String identifier, byte[] key) {
|
||||||
|
return identifier + Base64.encodeBytes(key, Base64.DONT_BREAK_LINES);
|
||||||
|
}
|
||||||
|
}
|
21
client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java
vendored
Normal file
21
client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBrokerConfig.java
vendored
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import org.skife.config.Config;
|
||||||
|
import org.skife.config.Default;
|
||||||
|
|
||||||
|
public abstract class MemcachedCacheBrokerConfig
|
||||||
|
{
|
||||||
|
@Config("${prefix}.expiration")
|
||||||
|
@Default("31536000")
|
||||||
|
public abstract int getExpiration();
|
||||||
|
|
||||||
|
@Config("${prefix}.timeout")
|
||||||
|
@Default("500")
|
||||||
|
public abstract int getTimeout();
|
||||||
|
|
||||||
|
@Config("${prefix}.hosts")
|
||||||
|
public abstract String getHosts();
|
||||||
|
|
||||||
|
@Config("${prefix}.maxObjectSize")
|
||||||
|
public abstract int getMaxObjectSize();
|
||||||
|
}
|
97
client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java
vendored
Normal file
97
client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java
vendored
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.caliper.Param;
|
||||||
|
import com.google.caliper.Runner;
|
||||||
|
import com.google.caliper.SimpleBenchmark;
|
||||||
|
import net.spy.memcached.AddrUtil;
|
||||||
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
|
import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
|
import net.spy.memcached.FailureMode;
|
||||||
|
import net.spy.memcached.MemcachedClient;
|
||||||
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
|
||||||
|
{
|
||||||
|
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
|
||||||
|
|
||||||
|
private MemcachedCacheBroker broker;
|
||||||
|
private MemcachedClientIF client;
|
||||||
|
|
||||||
|
private Cache cache;
|
||||||
|
private static byte[] randBytes;
|
||||||
|
|
||||||
|
// object size in kB
|
||||||
|
@Param({"1", "5", "10", "40"}) int objectSize;
|
||||||
|
@Param({"100", "1000"}) int objectCount;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp() throws Exception
|
||||||
|
{
|
||||||
|
SerializingTranscoder transcoder = new SerializingTranscoder(
|
||||||
|
50 * 1024 * 1024 // 50 MB
|
||||||
|
);
|
||||||
|
// disable compression
|
||||||
|
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
client = new MemcachedClient(
|
||||||
|
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
|
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
||||||
|
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||||
|
.setDaemon(true)
|
||||||
|
.setFailureMode(FailureMode.Retry)
|
||||||
|
.setTranscoder(transcoder)
|
||||||
|
.setShouldOptimize(true)
|
||||||
|
.build(),
|
||||||
|
AddrUtil.getAddresses("localhost:11211")
|
||||||
|
);
|
||||||
|
|
||||||
|
broker = new MemcachedCacheBroker(
|
||||||
|
client,
|
||||||
|
500, // 500 milliseconds
|
||||||
|
3600 * 24 * 365 // 1 year
|
||||||
|
);
|
||||||
|
|
||||||
|
cache = broker.provideCache("default");
|
||||||
|
|
||||||
|
|
||||||
|
randBytes = new byte[objectSize * 1024];
|
||||||
|
new Random(0).nextBytes(randBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
client.flush();
|
||||||
|
client.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void timePutObjects(int reps) {
|
||||||
|
for(int i = 0; i < reps; ++i) {
|
||||||
|
for(int k = 0; k < objectCount; ++k) {
|
||||||
|
String key = BASE_KEY + i;
|
||||||
|
cache.put(key.getBytes(), randBytes);
|
||||||
|
}
|
||||||
|
// make sure the write queue is empty
|
||||||
|
client.waitForQueues(1, TimeUnit.HOURS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] timeGetObject(int reps) {
|
||||||
|
byte[] bytes = null;
|
||||||
|
for (int i = 0; i < reps; i++) {
|
||||||
|
for(int k = 0; k < objectCount; ++k) {
|
||||||
|
String key = BASE_KEY + i;
|
||||||
|
bytes = cache.get(key.getBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Runner.main(MemcachedCacheBrokerBenchmark.class, args);
|
||||||
|
}
|
||||||
|
}
|
618
client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java
vendored
Normal file
618
client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java
vendored
Normal file
|
@ -0,0 +1,618 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import net.spy.memcached.CASResponse;
|
||||||
|
import net.spy.memcached.CASValue;
|
||||||
|
import net.spy.memcached.CachedData;
|
||||||
|
import net.spy.memcached.ConnectionObserver;
|
||||||
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
|
import net.spy.memcached.NodeLocator;
|
||||||
|
import net.spy.memcached.internal.BulkFuture;
|
||||||
|
import net.spy.memcached.transcoders.SerializingTranscoder;
|
||||||
|
import net.spy.memcached.transcoders.Transcoder;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class MemcachedCacheBrokerTest
|
||||||
|
{
|
||||||
|
private static final byte[] HI = "hi".getBytes();
|
||||||
|
private static final byte[] HO = "ho".getBytes();
|
||||||
|
private MemcachedCacheBroker broker;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
MemcachedClientIF client = new MockMemcachedClient();
|
||||||
|
broker = new MemcachedCacheBroker(client, 500, 3600);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanity() throws Exception
|
||||||
|
{
|
||||||
|
Cache aCache = broker.provideCache("a");
|
||||||
|
Cache theCache = broker.provideCache("the");
|
||||||
|
|
||||||
|
Assert.assertNull(aCache.get(HI));
|
||||||
|
put(aCache, HI, 1);
|
||||||
|
Assert.assertEquals(1, get(aCache, HI));
|
||||||
|
Assert.assertNull(theCache.get(HI));
|
||||||
|
|
||||||
|
put(theCache, HI, 2);
|
||||||
|
Assert.assertEquals(1, get(aCache, HI));
|
||||||
|
Assert.assertEquals(2, get(theCache, HI));
|
||||||
|
|
||||||
|
put(theCache, HO, 10);
|
||||||
|
Assert.assertEquals(1, get(aCache, HI));
|
||||||
|
Assert.assertNull(aCache.get(HO));
|
||||||
|
Assert.assertEquals(2, get(theCache, HI));
|
||||||
|
Assert.assertEquals(10, get(theCache, HO));
|
||||||
|
|
||||||
|
theCache.close();
|
||||||
|
Assert.assertEquals(1, get(aCache, HI));
|
||||||
|
Assert.assertNull(aCache.get(HO));
|
||||||
|
|
||||||
|
aCache.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(Cache cache, byte[] key, Integer value)
|
||||||
|
{
|
||||||
|
cache.put(key, Ints.toByteArray(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int get(Cache cache, byte[] key)
|
||||||
|
{
|
||||||
|
return Ints.fromByteArray(cache.get(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MockMemcachedClient implements MemcachedClientIF
|
||||||
|
{
|
||||||
|
private final ConcurrentMap<String, CachedData> theMap = new ConcurrentHashMap<String, CachedData>();
|
||||||
|
private final Transcoder<Object> transcoder = new SerializingTranscoder();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<SocketAddress> getAvailableServers()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<SocketAddress> getUnavailableServers()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transcoder<Object> getTranscoder()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeLocator getNodeLocator()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> append(long cas, String key, Object val)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> append(long cas, String key, T val, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> prepend(long cas, String key, Object val)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> prepend(long cas, String key, T val, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<CASResponse> asyncCAS(String key, long casId, T value, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<CASResponse> asyncCAS(String key, long casId, Object value)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> CASResponse cas(String key, long casId, int exp, T value, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CASResponse cas(String key, long casId, Object value)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> add(String key, int exp, T o, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> add(String key, int exp, Object o)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> set(String key, int exp, T o, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
theMap.put(key, tc.encode(o));
|
||||||
|
|
||||||
|
return new Future<Boolean>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean cancel(boolean b)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCancelled()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get() throws InterruptedException, ExecutionException
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> set(String key, int exp, Object o)
|
||||||
|
{
|
||||||
|
return set(key, exp, o, transcoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> replace(String key, int exp, T o, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> replace(String key, int exp, Object o)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<T> asyncGet(String key, final Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
CachedData data = theMap.get(key);
|
||||||
|
final T theValue = data != null ? tc.decode(data) : null;
|
||||||
|
|
||||||
|
return new Future<T>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean cancel(boolean b)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCancelled()
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone()
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() throws InterruptedException, ExecutionException
|
||||||
|
{
|
||||||
|
return theValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
|
||||||
|
{
|
||||||
|
return theValue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Object> asyncGet(String key)
|
||||||
|
{
|
||||||
|
return asyncGet(key, transcoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<CASValue<Object>> asyncGetAndTouch(String key, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<CASValue<T>> asyncGetAndTouch(String key, int exp, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CASValue<Object> getAndTouch(String key, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> CASValue<T> getAndTouch(String key, int exp, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<CASValue<T>> asyncGets(String key, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<CASValue<Object>> asyncGets(String key)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> CASValue<T> gets(String key, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CASValue<Object> gets(String key)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> T get(String key, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
CachedData data = theMap.get(key);
|
||||||
|
return data != null ? tc.decode(data) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object get(String key)
|
||||||
|
{
|
||||||
|
return get(key, transcoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Iterator<Transcoder<T>> tcs)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Iterator<Transcoder<T>> tcs)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keys, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BulkFuture<Map<String, Object>> asyncGetBulk(Iterator<String> keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Transcoder<T> tc, String... keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Map<String, T> getBulk(Iterator<String> keys, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Map<String, T> getBulk(Collection<String> keys, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getBulk(Iterator<String> keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getBulk(Collection<String> keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Map<String, T> getBulk(Transcoder<T> tc, String... keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object> getBulk(String... keys)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> touch(String key, int exp, Transcoder<T> tc)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<Boolean> touch(String key, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<SocketAddress, String> getVersions()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<SocketAddress, Map<String, String>> getStats()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<SocketAddress, Map<String, String>> getStats(String prefix)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, long by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, int by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, long by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, int by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, long by, long def, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, int by, long def, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, long by, long def, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, int by, long def, int exp)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Long> asyncIncr(String key, long by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Long> asyncIncr(String key, int by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Long> asyncDecr(String key, long by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Long> asyncDecr(String key, int by)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, long by, long def)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incr(String key, int by, long def)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, long by, long def)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long decr(String key, int by, long def)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> delete(String key)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> flush(int delay)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<Boolean> flush()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shutdown(long timeout, TimeUnit unit)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean waitForQueues(long timeout, TimeUnit unit)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addObserver(ConnectionObserver obs)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeObserver(ConnectionObserver obs)
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> listSaslMechanisms()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
};
|
12
pom.xml
12
pom.xml
|
@ -257,6 +257,11 @@
|
||||||
<artifactId>zkclient</artifactId>
|
<artifactId>zkclient</artifactId>
|
||||||
<version>0.1</version>
|
<version>0.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.simple-spring-memcached</groupId>
|
||||||
|
<artifactId>spymemcached</artifactId>
|
||||||
|
<version>2.8.4</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Test Scope -->
|
<!-- Test Scope -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -279,6 +284,13 @@
|
||||||
<version>4.8.1</version>
|
<version>4.8.1</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.caliper</groupId>
|
||||||
|
<artifactId>caliper</artifactId>
|
||||||
|
<version>0.5-rc1</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
<build>
|
<build>
|
||||||
|
|
Loading…
Reference in New Issue