diff --git a/client/pom.xml b/client/pom.xml
index 5db815ff52e..29ea77b4949 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -170,6 +170,10 @@
com.github.sgroschupfzkclient
+
+ com.google.code.simple-spring-memcached
+ spymemcached
+
@@ -182,6 +186,11 @@
easymocktest
+
+ com.google.caliper
+ caliper
+ test
+
diff --git a/client/src/main/java/com/metamx/druid/client/cache/Cache.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java
index a9e9ebdb12c..e7907c9548f 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/Cache.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/Cache.java
@@ -26,6 +26,6 @@ package com.metamx.druid.client.cache;
public interface Cache
{
public byte[] get(byte[] key);
- public byte[] put(byte[] key, byte[] value);
+ public void put(byte[] key, byte[] value);
public void close();
}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java
index 9b74df56159..d1337163ba6 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java
@@ -66,5 +66,6 @@ public class CacheMonitor extends AbstractMonitor
emitter.emit(builder.build(String.format("%s/evictions", metricPrefix), cacheStats.getNumEvictions()));
emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate()));
emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
+ emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
}
}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
index 822a78fbbc1..1a9950c8698 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
@@ -28,13 +28,15 @@ public class CacheStats
private final long size;
private final long sizeInBytes;
private final long numEvictions;
+ private final long numTimeouts;
public CacheStats(
long numHits,
long numMisses,
long size,
long sizeInBytes,
- long numEvictions
+ long numEvictions,
+ long numTimeouts
)
{
this.numHits = numHits;
@@ -42,6 +44,7 @@ public class CacheStats
this.size = size;
this.sizeInBytes = sizeInBytes;
this.numEvictions = numEvictions;
+ this.numTimeouts = numTimeouts;
}
public long getNumHits()
@@ -69,6 +72,11 @@ public class CacheStats
return numEvictions;
}
+ public long getNumTimeouts()
+ {
+ return numTimeouts;
+ }
+
public long numLookups()
{
return numHits + numMisses;
@@ -95,7 +103,8 @@ public class CacheStats
numMisses - oldStats.numMisses,
size - oldStats.size,
sizeInBytes - oldStats.sizeInBytes,
- numEvictions - oldStats.numEvictions
+ numEvictions - oldStats.numEvictions,
+ numTimeouts - oldStats.numTimeouts
);
}
}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
index 943c5aae154..d8ec202021a 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/MapCacheBroker.java
@@ -77,7 +77,8 @@ public class MapCacheBroker implements CacheBroker
missCount.get(),
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
- byteCountingLRUMap.getEvictionCount()
+ byteCountingLRUMap.getEvictionCount(),
+ 0
);
}
@@ -112,11 +113,12 @@ public class MapCacheBroker implements CacheBroker
}
@Override
- public byte[] put(byte[] key, byte[] value)
+ public void put(byte[] key, byte[] value)
{
synchronized (clearLock) {
if (open) {
- return baseMap.put(computeKey(key), value);
+ baseMap.put(computeKey(key), value);
+ return;
}
}
throw new ISE("Cache for identifier[%s] is closed.", identifier);
diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
new file mode 100644
index 00000000000..2f1af877d8c
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheBroker.java
@@ -0,0 +1,145 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.client.cache;
+
+import com.google.common.base.Throwables;
+import net.iharder.base64.Base64;
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MemcachedCacheBroker implements CacheBroker
+{
+ public static MemcachedCacheBroker create(final MemcachedCacheBrokerConfig config)
+ {
+ try {
+ SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
+ // disable compression
+ transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+ return new MemcachedCacheBroker(
+ new MemcachedClient(
+ new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+ .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+ .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
+ .setDaemon(true)
+ .setFailureMode(FailureMode.Retry)
+ .setTranscoder(transcoder)
+ .setShouldOptimize(true)
+ .build(),
+ AddrUtil.getAddresses(config.getHosts())
+ ),
+ config.getTimeout(),
+ config.getExpiration()
+ );
+ } catch(IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private final int timeout;
+ private final int expiration;
+
+ private final MemcachedClientIF client;
+
+ private final AtomicLong hitCount = new AtomicLong(0);
+ private final AtomicLong missCount = new AtomicLong(0);
+ private final AtomicLong timeoutCount = new AtomicLong(0);
+
+ MemcachedCacheBroker(MemcachedClientIF client, int timeout, int expiration) {
+ this.timeout = timeout;
+ this.expiration = expiration;
+ this.client = client;
+ }
+
+ @Override
+ public CacheStats getStats()
+ {
+ return new CacheStats(
+ hitCount.get(),
+ missCount.get(),
+ 0,
+ 0,
+ 0,
+ timeoutCount.get()
+ );
+ }
+
+ @Override
+ public Cache provideCache(final String identifier)
+ {
+ return new Cache()
+ {
+ @Override
+ public byte[] get(byte[] key)
+ {
+ Future