diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java index b0c36629e89..e89c1113e8d 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java @@ -67,5 +67,6 @@ public class CacheMonitor extends AbstractMonitor emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate())); emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes())); emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts())); + emitter.emit(builder.build(String.format("%s/errors", metricPrefix), cacheStats.getNumErrors())); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java index 1a9950c8698..33f0a145082 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java @@ -29,6 +29,7 @@ public class CacheStats private final long sizeInBytes; private final long numEvictions; private final long numTimeouts; + private final long numErrors; public CacheStats( long numHits, @@ -36,7 +37,8 @@ public class CacheStats long size, long sizeInBytes, long numEvictions, - long numTimeouts + long numTimeouts, + long numErrors ) { this.numHits = numHits; @@ -45,6 +47,7 @@ public class CacheStats this.sizeInBytes = sizeInBytes; this.numEvictions = numEvictions; this.numTimeouts = numTimeouts; + this.numErrors = numErrors; } public long getNumHits() @@ -77,6 +80,11 @@ public class CacheStats return numTimeouts; } + public long getNumErrors() + { + return numErrors; + } + public long numLookups() { return numHits + numMisses; @@ -104,7 +112,8 @@ public class CacheStats size - oldStats.size, sizeInBytes - oldStats.sizeInBytes, numEvictions - oldStats.numEvictions, - numTimeouts - oldStats.numTimeouts + numTimeouts - oldStats.numTimeouts, + numErrors - oldStats.numErrors ); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java index 53e1e20280a..bf549ec31b9 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java @@ -76,6 +76,7 @@ public class MapCache implements Cache byteCountingLRUMap.size(), byteCountingLRUMap.getNumBytes(), byteCountingLRUMap.getEvictionCount(), + 0, 0 ); } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index ed7f5292e8a..436c606d496 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -24,6 +24,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -47,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; public class MemcachedCache implements Cache { + private static final Logger log = new Logger(MemcachedCache.class); + public static MemcachedCache create(final MemcachedCacheConfig config) { try { @@ -60,9 +65,11 @@ public class MemcachedCache implements Cache .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) .setDaemon(true) - .setFailureMode(FailureMode.Retry) + .setFailureMode(FailureMode.Cancel) .setTranscoder(transcoder) .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) .build(), AddrUtil.getAddresses(config.getHosts()) ), @@ -84,6 +91,7 @@ public class MemcachedCache implements Cache private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) { Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, @@ -105,14 +113,23 @@ public class MemcachedCache implements Cache 0, 0, 0, - timeoutCount.get() + timeoutCount.get(), + errorCount.get() ); } @Override public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + Future future; + try { + future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + log.warn(e, "Unable to queue cache operation"); + return null; + } try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -133,14 +150,22 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { - throw Throwables.propagate(e); + errorCount.incrementAndGet(); + log.warn(e, "Exception pulling item from cache"); + return null; } } @Override public void put(NamedKey key, byte[] value) { - client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + try { + client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + log.warn(e, "Unable to queue cache operation"); + } } private static byte[] serializeValue(NamedKey key, byte[] value) { @@ -183,7 +208,17 @@ public class MemcachedCache implements Cache } ); - BulkFuture> future = client.asyncGetBulk(keyLookup.keySet()); + Map results = Maps.newHashMap(); + + BulkFuture> future; + try { + future = client.asyncGetBulk(keyLookup.keySet()); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + log.warn(e, "Unable to queue cache operation"); + return results; + } try { Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); @@ -195,7 +230,6 @@ public class MemcachedCache implements Cache missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { final NamedKey key = keyLookup.get(entry.getKey()); final byte[] value = (byte[]) entry.getValue(); @@ -212,7 +246,9 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { - throw Throwables.propagate(e); + errorCount.incrementAndGet(); + log.warn(e, "Exception pulling item from cache"); + return results; } }