From 8f6c313561ef1040421e0aec9c20c9b572adeb2e Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 6 Mar 2013 17:28:09 -0800 Subject: [PATCH 1/3] log but do not fail if memcached operations do not get queued in time --- .../druid/client/cache/MemcachedCache.java | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index ed7f5292e8a..8562cbf202d 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -24,6 +24,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -47,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong; public class MemcachedCache implements Cache { + private static final Logger log = new Logger(MemcachedCache.class); + public static MemcachedCache create(final MemcachedCacheConfig config) { try { @@ -60,9 +65,11 @@ public class MemcachedCache implements Cache .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) .setDaemon(true) - .setFailureMode(FailureMode.Retry) + .setFailureMode(FailureMode.Cancel) .setTranscoder(transcoder) .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) .build(), AddrUtil.getAddresses(config.getHosts()) ), @@ -112,7 +119,14 @@ public class MemcachedCache implements Cache @Override public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + Future future; + try { + future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + return null; + } try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -140,7 +154,12 @@ public class MemcachedCache implements Cache @Override public void put(NamedKey key, byte[] value) { - client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + try { + client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + } catch(IllegalStateException e) { + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + } } private static byte[] serializeValue(NamedKey key, byte[] value) { @@ -183,7 +202,17 @@ public class MemcachedCache implements Cache } ); - BulkFuture> future = client.asyncGetBulk(keyLookup.keySet()); + Map results = Maps.newHashMap(); + + BulkFuture> future; + try { + future = client.asyncGetBulk(keyLookup.keySet()); + } catch(IllegalStateException e) { + timeoutCount.incrementAndGet(); + // operation did not get queued in time (queue is full) + log.warn(e, "Unable to queue cache operation"); + return results; + } try { Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); @@ -195,7 +224,6 @@ public class MemcachedCache implements Cache missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { final NamedKey key = keyLookup.get(entry.getKey()); final byte[] value = (byte[]) entry.getValue(); From 28373090759080358410d90dc33beab17604d727 Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 6 Mar 2013 19:22:15 -0800 Subject: [PATCH 2/3] do not fail on exceptions when pulling from cache --- .../java/com/metamx/druid/client/cache/MemcachedCache.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index 8562cbf202d..c2b6c32247c 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -147,7 +147,8 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { - throw Throwables.propagate(e); + log.warn(e, "Exception pulling item from cache"); + return null; } } @@ -240,7 +241,8 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { - throw Throwables.propagate(e); + log.warn(e, "Exception pulling item from cache"); + return results; } } From 1b51848a89074a0ba4659740ee3f0780322a4e4e Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 7 Mar 2013 10:38:41 -0800 Subject: [PATCH 3/3] add cache errorCount --- .../com/metamx/druid/client/cache/CacheMonitor.java | 1 + .../com/metamx/druid/client/cache/CacheStats.java | 13 +++++++++++-- .../com/metamx/druid/client/cache/MapCache.java | 1 + .../metamx/druid/client/cache/MemcachedCache.java | 10 ++++++++-- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java index b0c36629e89..e89c1113e8d 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheMonitor.java @@ -67,5 +67,6 @@ public class CacheMonitor extends AbstractMonitor emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate())); emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes())); emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts())); + emitter.emit(builder.build(String.format("%s/errors", metricPrefix), cacheStats.getNumErrors())); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java index 1a9950c8698..33f0a145082 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java +++ b/client/src/main/java/com/metamx/druid/client/cache/CacheStats.java @@ -29,6 +29,7 @@ public class CacheStats private final long sizeInBytes; private final long numEvictions; private final long numTimeouts; + private final long numErrors; public CacheStats( long numHits, @@ -36,7 +37,8 @@ public class CacheStats long size, long sizeInBytes, long numEvictions, - long numTimeouts + long numTimeouts, + long numErrors ) { this.numHits = numHits; @@ -45,6 +47,7 @@ public class CacheStats this.sizeInBytes = sizeInBytes; this.numEvictions = numEvictions; this.numTimeouts = numTimeouts; + this.numErrors = numErrors; } public long getNumHits() @@ -77,6 +80,11 @@ public class CacheStats return numTimeouts; } + public long getNumErrors() + { + return numErrors; + } + public long numLookups() { return numHits + numMisses; @@ -104,7 +112,8 @@ public class CacheStats size - oldStats.size, sizeInBytes - oldStats.sizeInBytes, numEvictions - oldStats.numEvictions, - numTimeouts - oldStats.numTimeouts + numTimeouts - oldStats.numTimeouts, + numErrors - oldStats.numErrors ); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java index 53e1e20280a..bf549ec31b9 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java @@ -76,6 +76,7 @@ public class MapCache implements Cache byteCountingLRUMap.size(), byteCountingLRUMap.getNumBytes(), byteCountingLRUMap.getEvictionCount(), + 0, 0 ); } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index c2b6c32247c..436c606d496 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -91,6 +91,7 @@ public class MemcachedCache implements Cache private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0); + private final AtomicLong errorCount = new AtomicLong(0); MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) { Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, @@ -112,7 +113,8 @@ public class MemcachedCache implements Cache 0, 0, 0, - timeoutCount.get() + timeoutCount.get(), + errorCount.get() ); } @@ -124,6 +126,7 @@ public class MemcachedCache implements Cache future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); } catch(IllegalStateException e) { // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); return null; } @@ -147,6 +150,7 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { + errorCount.incrementAndGet(); log.warn(e, "Exception pulling item from cache"); return null; } @@ -159,6 +163,7 @@ public class MemcachedCache implements Cache client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); } catch(IllegalStateException e) { // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); } } @@ -209,8 +214,8 @@ public class MemcachedCache implements Cache try { future = client.asyncGetBulk(keyLookup.keySet()); } catch(IllegalStateException e) { - timeoutCount.incrementAndGet(); // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); return results; } @@ -241,6 +246,7 @@ public class MemcachedCache implements Cache throw Throwables.propagate(e); } catch(ExecutionException e) { + errorCount.incrementAndGet(); log.warn(e, "Exception pulling item from cache"); return results; }