diff --git a/client/pom.xml b/client/pom.xml
index 5db815ff52e..eb5f801b9d3 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -170,6 +170,11 @@
com.github.sgroschupf
zkclient
+
+ com.google.code.simple-spring-memcached
+ spymemcached
+ 2.8.4
+
diff --git a/client/src/main/java/com/metamx/druid/client/cache/Cache.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java
index a9e9ebdb12c..e7907c9548f 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/Cache.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/Cache.java
@@ -26,6 +26,6 @@ package com.metamx.druid.client.cache;
public interface Cache
{
public byte[] get(byte[] key);
- public byte[] put(byte[] key, byte[] value);
+ public void put(byte[] key, byte[] value);
public void close();
}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
index 822a78fbbc1..1a9950c8698 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
@@ -28,13 +28,15 @@ public class CacheStats
private final long size;
private final long sizeInBytes;
private final long numEvictions;
+ private final long numTimeouts;
public CacheStats(
long numHits,
long numMisses,
long size,
long sizeInBytes,
- long numEvictions
+ long numEvictions,
+ long numTimeouts
)
{
this.numHits = numHits;
@@ -42,6 +44,7 @@ public class CacheStats
this.size = size;
this.sizeInBytes = sizeInBytes;
this.numEvictions = numEvictions;
+ this.numTimeouts = numTimeouts;
}
public long getNumHits()
@@ -69,6 +72,11 @@ public class CacheStats
return numEvictions;
}
+ public long getNumTimeouts()
+ {
+ return numTimeouts;
+ }
+
public long numLookups()
{
return numHits + numMisses;
@@ -95,7 +103,8 @@ public class CacheStats
numMisses - oldStats.numMisses,
size - oldStats.size,
sizeInBytes - oldStats.sizeInBytes,
- numEvictions - oldStats.numEvictions
+ numEvictions - oldStats.numEvictions,
+ numTimeouts - oldStats.numTimeouts
);
}
}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
index 943c5aae154..d8ec202021a 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
@@ -77,7 +77,8 @@ public class MapCacheBroker implements CacheBroker
missCount.get(),
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
- byteCountingLRUMap.getEvictionCount()
+ byteCountingLRUMap.getEvictionCount(),
+ 0
);
}
@@ -112,11 +113,12 @@ public class MapCacheBroker implements CacheBroker
}
@Override
- public byte[] put(byte[] key, byte[] value)
+ public void put(byte[] key, byte[] value)
{
synchronized (clearLock) {
if (open) {
- return baseMap.put(computeKey(key), value);
+ baseMap.put(computeKey(key), value);
+ return;
}
}
throw new ISE("Cache for identifier[%s] is closed.", identifier);
diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
new file mode 100644
index 00000000000..ffa5faca5cc
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
@@ -0,0 +1,111 @@
+package com.metamx.druid.client.cache;
+
+import net.iharder.base64.Base64;
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MemcachedCacheBroker implements CacheBroker
+{
+ public static CacheBroker create(final MemcachedCacheBrokerConfig config)
+ {
+ try {
+ return new MemcachedCacheBroker(
+ new MemcachedClient(
+ new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build(),
+ AddrUtil.getAddresses(config.getHosts())
+ ),
+ config.getTimeout(),
+ config.getExpiration()
+ );
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final int timeout;
+ private final int expiration;
+
+ private final MemcachedClientIF client;
+
+ private final AtomicLong hitCount = new AtomicLong(0);
+ private final AtomicLong missCount = new AtomicLong(0);
+ private final AtomicLong timeoutCount = new AtomicLong(0);
+
+ MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
+ this.timeout = timeout;
+ this.expiration = expiration;
+ this.client = client;
+ }
+
+ @Override
+ public CacheStats getStats()
+ {
+ return new CacheStats(
+ hitCount.get(),
+ missCount.get(),
+ 0,
+ 0,
+ 0,
+ timeoutCount.get()
+ );
+ }
+
+ @Override
+ public Cache provideCache(final String identifier)
+ {
+ return new Cache()
+ {
+ @Override
+ public byte[] get(byte[] key)
+ {
+ Future