From 58138568198ec2001e32a59863e4c4ca414a134c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Sat, 29 Aug 2015 08:15:26 -0700 Subject: [PATCH] More memcached metrics --- docs/content/operations/metrics.md | 8 + .../io/druid/client/cache/MemcachedCache.java | 341 +++++++++++++++--- .../client/cache/MemcachedCacheProvider.java | 13 +- .../client/cache/MemcachedCacheTest.java | 159 +++++++- 4 files changed, 450 insertions(+), 71 deletions(-) diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 633c9805bca..466f5351754 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -71,6 +71,14 @@ Available Metrics |`*/timeouts`|Number of cache timeouts.||0| |`*/errors`|Number of cache errors.||0| +#### Memcached only metrics +Memcached client metrics are reported as per the following. These metrics come directly from the client as opposed to from the cache retrieval layer. +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/cache/memcached/total`|Cache metrics unique to memcached (only if `druid.cache.type=memcached`) as their actual values|Variable|N/A| +|`query/cache/memcached/delta`|Cache metrics unique to memcached (only if `druid.cache.type=memcached`) as their delta from the prior event emission|Variable|N/A| + + ## Ingestion Metrics |Metric|Description|Dimensions|Normal Value| diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 96abb239aff..363e6ad4d37 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -20,12 +20,18 @@ package io.druid.client.cache; import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import com.metamx.metrics.AbstractMonitor; +import com.metamx.metrics.MonitorScheduler; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; @@ -33,6 +39,8 @@ import net.spy.memcached.HashAlgorithm; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; import net.spy.memcached.ops.LinkedOperationQueueFactory; import net.spy.memcached.ops.OperationQueueFactory; import org.apache.commons.codec.digest.DigestUtils; @@ -41,12 +49,16 @@ import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; public class MemcachedCache implements Cache { @@ -69,8 +81,54 @@ public class MemcachedCache implements Cache } }; - public static MemcachedCache create(final MemcachedCacheConfig config) + public static MemcachedCache create(final MemcachedCacheConfig config, MonitorScheduler emitter) { + final ConcurrentMap counters = new ConcurrentHashMap<>(); + final ConcurrentMap meters = new ConcurrentHashMap<>(); + emitter.addMonitor( + new AbstractMonitor() + { + final AtomicReference> priorValues = new AtomicReference>(new HashMap()); + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final Map priorValues = this.priorValues.get(); + final Map currentValues = getCurrentValues(); + final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder(); + for (Map.Entry entry : currentValues.entrySet()) { + emitter.emit( + builder.setDimension("memcached metric", entry.getKey()) + .build("query/cache/memcached/total", entry.getValue()) + ); + final Long prior = priorValues.get(entry.getKey()); + if (prior != null) { + emitter.emit( + builder.setDimension("memcached metric", entry.getKey()).build( + "query/cache/memcached/delta", + entry.getValue() - prior + ) + ); + } + } + + if (!this.priorValues.compareAndSet(priorValues, currentValues)) { + log.error("Prior value changed while I was reporting! updating anyways"); + this.priorValues.set(currentValues); + } + return true; + } + + private Map getCurrentValues() + { + final ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Map.Entry entry : counters.entrySet()) { + builder.put(entry.getKey(), entry.getValue().get()); + } + return builder.build(); + } + } + ); try { LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); @@ -79,35 +137,203 @@ public class MemcachedCache implements Cache OperationQueueFactory opQueueFactory; long maxQueueBytes = config.getMaxOperationQueueSize(); - if(maxQueueBytes > 0) { + if (maxQueueBytes > 0) { opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes); } else { opQueueFactory = new LinkedOperationQueueFactory(); } + final Predicate interesting = new Predicate() + { + @Override + public boolean apply(@Nullable String input) + { + // See net.spy.memcached.MemcachedConnection.registerMetrics() + // in current version shutdown queue metric is borked + return input != null && !input.contains("Down"); + } + }; + return new MemcachedCache( - new MemcachedClient( - new MemcachedCustomConnectionFactoryBuilder() - // 1000 repetitions gives us good distribution with murmur3_128 - // (approx < 5% difference in counts across nodes, with 5 cache nodes) - .setKetamaNodeRepetitions(1000) - .setHashAlg(MURMUR3_128) - .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) - .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) - .setDaemon(true) - .setFailureMode(FailureMode.Cancel) - .setTranscoder(transcoder) - .setShouldOptimize(true) - .setOpQueueMaxBlockTime(config.getTimeout()) - .setOpTimeout(config.getTimeout()) - .setReadBufferSize(config.getReadBufferSize()) - .setOpQueueFactory(opQueueFactory) - .build(), - AddrUtil.getAddresses(config.getHosts()) - ), - config + new MemcachedClient( + new MemcachedCustomConnectionFactoryBuilder() + // 1000 repetitions gives us good distribution with murmur3_128 + // (approx < 5% difference in counts across nodes, with 5 cache nodes) + .setKetamaNodeRepetitions(1000) + .setHashAlg(MURMUR3_128) + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) + .setDaemon(true) + .setFailureMode(FailureMode.Cancel) + .setTranscoder(transcoder) + .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()) + .setOpQueueFactory(opQueueFactory) + .setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds + .setWriteOpQueueFactory(opQueueFactory) + .setReadOpQueueFactory(opQueueFactory) + .setMetricCollector( + new MetricCollector() + { + @Override + public void addCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + counters.put(name, new AtomicLong(0L)); + + if (log.isDebugEnabled()) { + log.debug("Add Counter [%s]", name); + } + } + + @Override + public void removeCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + counters.remove(name); + + if (log.isDebugEnabled()) { + log.debug("Remove Counter [%s]", name); + } + } + + @Override + public void incrementCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.incrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("Incrament [%s]", name); + } + } + + @Override + public void incrementCounter(String name, int amount) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.addAndGet(amount); + + if (log.isDebugEnabled()) { + log.debug("Increment [%s] %d", name, amount); + } + } + + @Override + public void decrementCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.decrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("Decrement [%s]", name); + } + } + + @Override + public void decrementCounter(String name, int amount) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0L)); + counter = counters.get(name); + } + counter.addAndGet(-amount); + + if (log.isDebugEnabled()) { + log.debug("Decrement [%s] %d", name, amount); + } + } + + @Override + public void addMeter(String name) + { + meters.put(name, new AtomicLong(0L)); + if (log.isDebugEnabled()) { + log.debug("Adding meter [%s]", name); + } + } + + @Override + public void removeMeter(String name) + { + meters.remove(name); + if (log.isDebugEnabled()) { + log.debug("Removing meter [%s]", name); + } + } + + @Override + public void markMeter(String name) + { + AtomicLong meter = meters.get(name); + if (meter == null) { + meters.putIfAbsent(name, new AtomicLong(0L)); + meter = meters.get(name); + } + meter.incrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("Increment counter [%s]", name); + } + } + + @Override + public void addHistogram(String name) + { + log.debug("Ignoring add histogram [%s]", name); + } + + @Override + public void removeHistogram(String name) + { + log.debug("Ignoring remove histogram [%s]", name); + } + + @Override + public void updateHistogram(String name, int amount) + { + log.debug("Ignoring update histogram [%s]: %d", name, amount); + } + } + ) + .build(), + AddrUtil.getAddresses(config.getHosts()) + ), + config ); - } catch(IOException e) { + } + catch (IOException e) { throw Throwables.propagate(e); } } @@ -124,11 +350,14 @@ public class MemcachedCache implements Cache private final AtomicLong errorCount = new AtomicLong(0); - MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) { - Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, - "memcachedPrefix length [%d] exceeds maximum length [%d]", - config.getMemcachedPrefix().length(), - MAX_PREFIX_LENGTH); + MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) + { + Preconditions.checkArgument( + config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, + "memcachedPrefix length [%d] exceeds maximum length [%d]", + config.getMemcachedPrefix().length(), + MAX_PREFIX_LENGTH + ); this.timeout = config.getTimeout(); this.expiration = config.getExpiration(); this.client = client; @@ -155,7 +384,8 @@ public class MemcachedCache implements Cache Future future; try { future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); - } catch(IllegalStateException e) { + } + catch (IllegalStateException e) { // operation did not get queued in time (queue is full) errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); @@ -163,24 +393,23 @@ public class MemcachedCache implements Cache } try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); - if(bytes != null) { + if (bytes != null) { hitCount.incrementAndGet(); - } - else { + } else { missCount.incrementAndGet(); } return bytes == null ? null : deserializeValue(key, bytes); } - catch(TimeoutException e) { + catch (TimeoutException e) { timeoutCount.incrementAndGet(); future.cancel(false); return null; } - catch(InterruptedException e) { + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - catch(ExecutionException e) { + catch (ExecutionException e) { errorCount.incrementAndGet(); log.warn(e, "Exception pulling item from cache"); return null; @@ -192,14 +421,16 @@ public class MemcachedCache implements Cache { try { client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); - } catch(IllegalStateException e) { + } + catch (IllegalStateException e) { // operation did not get queued in time (queue is full) errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); } } - private static byte[] serializeValue(NamedKey key, byte[] value) { + private static byte[] serializeValue(NamedKey key, byte[] value) + { byte[] keyBytes = key.toByteArray(); return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length) .putInt(keyBytes.length) @@ -208,7 +439,8 @@ public class MemcachedCache implements Cache .array(); } - private static byte[] deserializeValue(NamedKey key, byte[] bytes) { + private static byte[] deserializeValue(NamedKey key, byte[] bytes) + { ByteBuffer buf = ByteBuffer.wrap(bytes); final int keyLength = buf.getInt(); @@ -217,8 +449,10 @@ public class MemcachedCache implements Cache byte[] value = new byte[buf.remaining()]; buf.get(value); - Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()), - "Keys do not match, possible hash collision?"); + Preconditions.checkState( + Arrays.equals(keyBytes, key.toByteArray()), + "Keys do not match, possible hash collision?" + ); return value; } @@ -244,7 +478,8 @@ public class MemcachedCache implements Cache BulkFuture> future; try { future = client.asyncGetBulk(keyLookup.keySet()); - } catch(IllegalStateException e) { + } + catch (IllegalStateException e) { // operation did not get queued in time (queue is full) errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); @@ -254,14 +489,14 @@ public class MemcachedCache implements Cache try { Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); - if(future.isTimeout()) { + if (future.isTimeout()) { future.cancel(false); timeoutCount.incrementAndGet(); } missCount.addAndGet(keyLookup.size() - some.size()); hitCount.addAndGet(some.size()); - for(Map.Entry entry : some.entrySet()) { + for (Map.Entry entry : some.entrySet()) { final NamedKey key = keyLookup.get(entry.getKey()); final byte[] value = (byte[]) entry.getValue(); results.put( @@ -272,11 +507,11 @@ public class MemcachedCache implements Cache return results; } - catch(InterruptedException e) { + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - catch(ExecutionException e) { + catch (ExecutionException e) { errorCount.incrementAndGet(); log.warn(e, "Exception pulling item from cache"); return results; @@ -290,18 +525,20 @@ public class MemcachedCache implements Cache } public static final int MAX_PREFIX_LENGTH = - MemcachedClientIF.MAX_KEY_LENGTH - - 40 // length of namespace hash - - 40 // length of key hash - - 2 // length of separators - ; + MemcachedClientIF.MAX_KEY_LENGTH + - 40 // length of namespace hash + - 40 // length of key hash + - 2 // length of separators + ; - private static String computeKeyHash(String memcachedPrefix, NamedKey key) { + private static String computeKeyHash(String memcachedPrefix, NamedKey key) + { // hash keys to keep things under 250 characters for memcached return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key); } - public boolean isLocal() { + public boolean isLocal() + { return false; } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java index d0b4e10aa06..4d71d54ca7f 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java @@ -17,11 +17,22 @@ package io.druid.client.cache; +import com.google.inject.Inject; +import com.metamx.metrics.MonitorScheduler; + public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider { + private final MonitorScheduler emitter; + + @Inject + public MemcachedCacheProvider(MonitorScheduler emitter) + { + this.emitter = emitter; + } + @Override public Cache get() { - return MemcachedCache.create(this); + return MemcachedCache.create(this, emitter); } } diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java index 3da2276c44f..44b8916bc43 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java @@ -17,9 +17,27 @@ package io.druid.client.cache; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.Monitor; +import com.metamx.metrics.MonitorScheduler; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.Initialization; +import io.druid.jackson.DefaultObjectMapper; import net.spy.memcached.BroadcastOpFactory; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; @@ -34,11 +52,14 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.Transcoder; +import org.easymock.Capture; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -55,37 +76,119 @@ import java.util.concurrent.TimeoutException; */ public class MemcachedCacheTest { + private static final Logger log = new Logger(MemcachedCacheTest.class); private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private MemcachedCache cache; + private final MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig() + { + @Override + public String getMemcachedPrefix() + { + return "druid-memcached-test"; + } + + @Override + public int getTimeout() + { + return 10; + } + + @Override + public int getExpiration() + { + return 3600; + } + + @Override + public String getHosts() + { + return "localhost:9999"; + } + }; @Before public void setUp() throws Exception { MemcachedClientIF client = new MockMemcachedClient(); - cache = new MemcachedCache( - client, new MemcachedCacheConfig() + cache = new MemcachedCache(client, memcachedCacheConfig); + } + + @Test + public void testBasicInjection() throws Exception + { + final MemcachedCacheConfig config = new MemcachedCacheConfig() { @Override - public String getMemcachedPrefix() + public String getHosts() { - return "druid-memcached-test"; + return "127.0.0.1:22"; } + }; + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/memcached"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - @Override - public int getTimeout() - { - return 500; - } - - @Override - public int getExpiration() - { - return 3600; - } - - } + binder.bind(MemcachedCacheConfig.class).toInstance(config); + binder.bind(Cache.class).toProvider(MemcachedProviderWithConfig.class).in(ManageLifecycle.class); + } + } + ) ); + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + try { + Cache cache = injector.getInstance(Cache.class); + Assert.assertEquals(MemcachedCache.class, cache.getClass()); + } + finally { + lifecycle.stop(); + } + } + + @Test + public void testMonitor() throws Exception + { + MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class); + Capture monitorCapture = Capture.newInstance(); + monitorScheduler.addMonitor(EasyMock.capture(monitorCapture)); + EasyMock.expectLastCall().once(); + EasyMock.replay(monitorScheduler); + MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig, monitorScheduler); + EasyMock.verify(monitorScheduler); + + Assert.assertTrue(monitorCapture.hasCaptured()); + final Monitor monitor = monitorCapture.getValue(); + monitor.start(); + Assert.assertNotNull(monitor); + + Emitter emitter = EasyMock.createNiceMock(Emitter.class); + final Collection events = new ArrayList<>(); + final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter) + { + @Override + public void emit(Event event) + { + events.add(event); + } + }; + + while (events.isEmpty()) { + Thread.sleep(memcachedCacheConfig.getTimeout()); + Assert.assertTrue(monitor.monitor(serviceEmitter)); + } + + Assert.assertFalse(events.isEmpty()); + ObjectMapper mapper = new DefaultObjectMapper(); + for (Event event : events) { + log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap())); + } } @Test @@ -146,6 +249,26 @@ public class MemcachedCacheTest } } +class MemcachedProviderWithConfig extends MemcachedCacheProvider +{ + private final MemcachedCacheConfig config; + private final MonitorScheduler emitter; + + @Inject + public MemcachedProviderWithConfig(MonitorScheduler emitter, MemcachedCacheConfig config) + { + super(emitter); + this.emitter = emitter; + this.config = config; + } + + @Override + public Cache get() + { + return MemcachedCache.create(config, emitter); + } +} + class MockMemcachedClient implements MemcachedClientIF { private final ConcurrentMap theMap = new ConcurrentHashMap(); @@ -864,4 +987,4 @@ class MockMemcachedClient implements MemcachedClientIF { throw new UnsupportedOperationException("not implemented"); } -}; +}