diff --git a/server/src/main/java/io/druid/client/cache/Cache.java b/server/src/main/java/io/druid/client/cache/Cache.java index a8a0b7340f7..90dd8429345 100644 --- a/server/src/main/java/io/druid/client/cache/Cache.java +++ b/server/src/main/java/io/druid/client/cache/Cache.java @@ -20,6 +20,7 @@ package io.druid.client.cache; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import com.metamx.common.StringUtils; +import com.metamx.emitter.service.ServiceEmitter; import java.nio.ByteBuffer; import java.util.Arrays; @@ -39,6 +40,12 @@ public interface Cache public boolean isLocal(); + /** + * Custom metrics not covered by CacheStats may be emitted by this method. + * @param emitter The service emitter to emit on. + */ + public void doMonitor(ServiceEmitter emitter); + public class NamedKey { final public String namespace; diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java b/server/src/main/java/io/druid/client/cache/CacheMonitor.java index a2129690aa2..a479f11cc5d 100644 --- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java +++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java @@ -49,6 +49,9 @@ public class CacheMonitor extends AbstractMonitor emitStats(emitter, "query/cache/total", currCacheStats, builder); prevCacheStats = currCacheStats; + + // Any custom cache statistics that need monitoring + cache.doMonitor(emitter); return true; } diff --git a/server/src/main/java/io/druid/client/cache/MapCache.java b/server/src/main/java/io/druid/client/cache/MapCache.java index 9b43188aabc..5e30676ea78 100644 --- a/server/src/main/java/io/druid/client/cache/MapCache.java +++ b/server/src/main/java/io/druid/client/cache/MapCache.java @@ -20,6 +20,7 @@ package io.druid.client.cache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.emitter.service.ServiceEmitter; import java.nio.ByteBuffer; import java.util.Collections; @@ -132,7 +133,7 @@ public class MapCache implements Cache toRemove.add(next); } } - for(ByteBuffer key : toRemove) { + for (ByteBuffer key : toRemove) { baseMap.remove(key); } } @@ -163,4 +164,10 @@ public class MapCache implements Cache { return true; } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + // No special monitoring + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index d290f7f2405..dec2c3e4979 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -25,6 +25,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -33,7 +34,6 @@ import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; -import com.metamx.metrics.MonitorScheduler; import io.druid.collections.LoadBalancingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; @@ -58,6 +58,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -88,11 +89,11 @@ public class MemcachedCache implements Cache } }; - public static MemcachedCache create(final MemcachedCacheConfig config, MonitorScheduler emitter) + public static MemcachedCache create(final MemcachedCacheConfig config) { final ConcurrentMap counters = new ConcurrentHashMap<>(); final ConcurrentMap meters = new ConcurrentHashMap<>(); - emitter.addMonitor( + final AbstractMonitor monitor = new AbstractMonitor() { final AtomicReference> priorValues = new AtomicReference>(new HashMap()); @@ -134,8 +135,7 @@ public class MemcachedCache implements Cache } return builder.build(); } - } - ); + }; try { LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); @@ -152,12 +152,24 @@ public class MemcachedCache implements Cache final Predicate interesting = new Predicate() { + // See net.spy.memcached.MemcachedConnection.registerMetrics() + private final Set interestingMetrics = ImmutableSet.of( + "[MEM] Reconnecting Nodes (ReconnectQueue)", + //"[MEM] Shutting Down Nodes (NodesToShutdown)", // Busted + "[MEM] Request Rate: All", + "[MEM] Average Bytes written to OS per write", + "[MEM] Average Bytes read from OS per read", + "[MEM] Average Time on wire for operations (µs)", + "[MEM] Response Rate: All (Failure + Success + Retry)", + "[MEM] Response Rate: Retry", + "[MEM] Response Rate: Failure", + "[MEM] Response Rate: Success" + ); + @Override public boolean apply(@Nullable String input) { - // See net.spy.memcached.MemcachedConnection.registerMetrics() - // in current version shutdown queue metric is borked - return input != null && !input.contains("Down"); + return input != null && interestingMetrics.contains(input); } }; @@ -169,7 +181,7 @@ public class MemcachedCache implements Cache if (!interesting.apply(name)) { return; } - counters.put(name, new AtomicLong(0L)); + counters.putIfAbsent(name, new AtomicLong(0L)); if (log.isDebugEnabled()) { log.debug("Add Counter [%s]", name); @@ -179,13 +191,8 @@ public class MemcachedCache implements Cache @Override public void removeCounter(String name) { - if (!interesting.apply(name)) { - return; - } - counters.remove(name); - if (log.isDebugEnabled()) { - log.debug("Remove Counter [%s]", name); + log.debug("Ignoring request to remove [%s]", name); } } @@ -264,7 +271,10 @@ public class MemcachedCache implements Cache @Override public void addMeter(String name) { - meters.put(name, new AtomicLong(0L)); + if (!interesting.apply(name)) { + return; + } + meters.putIfAbsent(name, new AtomicLong(0L)); if (log.isDebugEnabled()) { log.debug("Adding meter [%s]", name); } @@ -273,15 +283,20 @@ public class MemcachedCache implements Cache @Override public void removeMeter(String name) { - meters.remove(name); + if (!interesting.apply(name)) { + return; + } if (log.isDebugEnabled()) { - log.debug("Removing meter [%s]", name); + log.debug("Ignoring request to remove meter [%s]", name); } } @Override public void markMeter(String name) { + if (!interesting.apply(name)) { + return; + } AtomicLong meter = meters.get(name); if (meter == null) { meters.putIfAbsent(name, new AtomicLong(0L)); @@ -360,7 +375,7 @@ public class MemcachedCache implements Cache ); } - return new MemcachedCache(clientSupplier, config); + return new MemcachedCache(clientSupplier, config, monitor); } catch (IOException e) { throw Throwables.propagate(e); @@ -377,9 +392,14 @@ public class MemcachedCache implements Cache private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); + private final AbstractMonitor monitor; - MemcachedCache(Supplier> client, MemcachedCacheConfig config) + MemcachedCache( + Supplier> client, + MemcachedCacheConfig config, + AbstractMonitor monitor + ) { Preconditions.checkArgument( config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, @@ -387,6 +407,7 @@ public class MemcachedCache implements Cache config.getMemcachedPrefix().length(), MAX_PREFIX_LENGTH ); + this.monitor = monitor; this.timeout = config.getTimeout(); this.expiration = config.getExpiration(); this.client = client; @@ -587,4 +608,10 @@ public class MemcachedCache implements Cache { return false; } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + monitor.doMonitor(emitter); + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java index e01db9fadf3..d0b4e10aa06 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheProvider.java @@ -17,26 +17,11 @@ package io.druid.client.cache; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.metamx.metrics.MonitorScheduler; - public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider { - private final MonitorScheduler emitter; - - @JsonCreator - public MemcachedCacheProvider( - @JacksonInject - MonitorScheduler emitter - ) - { - this.emitter = emitter; - } - @Override public Cache get() { - return MemcachedCache.create(this, emitter); + return MemcachedCache.create(this); } } diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java index db405ff7f30..e8f7df371f8 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java @@ -102,7 +102,7 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark { return 3600; } - } + }, MemcachedCacheTest.NOOP_MONITOR ); randBytes = new byte[objectSize * 1024]; diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java index 9b3017bd16a..520c5282655 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java @@ -33,7 +33,7 @@ import com.metamx.common.logger.Logger; import com.metamx.emitter.core.Emitter; import com.metamx.emitter.core.Event; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.metrics.Monitor; +import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.MonitorScheduler; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; @@ -56,7 +56,6 @@ import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.Transcoder; -import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -84,6 +83,14 @@ public class MemcachedCacheTest private static final Logger log = new Logger(MemcachedCacheTest.class); private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); private static final byte[] HO = "hooooooooooooooooooo".getBytes(); + protected static final AbstractMonitor NOOP_MONITOR = new AbstractMonitor() + { + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + return false; + } + }; private MemcachedCache cache; private final MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig() { @@ -119,7 +126,8 @@ public class MemcachedCacheTest Suppliers.>ofInstance( StupidResourceHolder.create(new MockMemcachedClient()) ), - memcachedCacheConfig + memcachedCacheConfig, + NOOP_MONITOR ); } @@ -191,20 +199,8 @@ public class MemcachedCacheTest @Test public void testMonitor() throws Exception { - MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class); - Capture monitorCapture = Capture.newInstance(); - monitorScheduler.addMonitor(EasyMock.capture(monitorCapture)); - EasyMock.expectLastCall().once(); - EasyMock.replay(monitorScheduler); - MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig, monitorScheduler); - EasyMock.verify(monitorScheduler); - - Assert.assertTrue(monitorCapture.hasCaptured()); - final Monitor monitor = monitorCapture.getValue(); - monitor.start(); - Assert.assertNotNull(monitor); - - Emitter emitter = EasyMock.createNiceMock(Emitter.class); + final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig); + final Emitter emitter = EasyMock.createNiceMock(Emitter.class); final Collection events = new ArrayList<>(); final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter) { @@ -217,7 +213,7 @@ public class MemcachedCacheTest while (events.isEmpty()) { Thread.sleep(memcachedCacheConfig.getTimeout()); - Assert.assertTrue(monitor.monitor(serviceEmitter)); + cache.doMonitor(serviceEmitter); } Assert.assertFalse(events.isEmpty()); @@ -288,20 +284,17 @@ public class MemcachedCacheTest class MemcachedProviderWithConfig extends MemcachedCacheProvider { private final MemcachedCacheConfig config; - private final MonitorScheduler emitter; @Inject - public MemcachedProviderWithConfig(MonitorScheduler emitter, MemcachedCacheConfig config) + public MemcachedProviderWithConfig(MemcachedCacheConfig config) { - super(emitter); - this.emitter = emitter; this.config = config; } @Override public Cache get() { - return MemcachedCache.create(config, emitter); + return MemcachedCache.create(config); } }