diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index ed7f5292e8a..8562cbf202d 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -24,6 +24,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -47,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; public class MemcachedCache implements Cache { + private static final Logger log = new Logger(MemcachedCache.class); + public static MemcachedCache create(final MemcachedCacheConfig config) { try { @@ -60,9 +65,11 @@ public class MemcachedCache implements Cache .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) .setDaemon(true) - .setFailureMode(FailureMode.Retry) + .setFailureMode(FailureMode.Cancel) .setTranscoder(transcoder) .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) .build(), AddrUtil.getAddresses(config.getHosts()) ), @@ -112,7 +119,14 @@ public class MemcachedCache implements Cache @Override public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + Future future; + try { + future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + return null; + } try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -140,7 +154,12 @@ public class MemcachedCache implements Cache @Override public void put(NamedKey key, byte[] value) { - client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + try { + client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + } } private static byte[] serializeValue(NamedKey key, byte[] value) { @@ -183,7 +202,17 @@ public class MemcachedCache implements Cache } ); - BulkFuture> future = client.asyncGetBulk(keyLookup.keySet()); + Map results = Maps.newHashMap(); + + BulkFuture> future; + try { + future = client.asyncGetBulk(keyLookup.keySet()); + } catch(IllegalStateException e) { + timeoutCount.incrementAndGet(); + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + return results; + } try { Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); @@ -195,7 +224,6 @@ public class MemcachedCache implements Cache missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { final NamedKey key = keyLookup.get(entry.getKey()); final byte[] value = (byte[]) entry.getValue();