Update memcached client for better concurrency in metrics. Also fixes another injection problem

This commit is contained in:
Charles Allen 2015-09-21 18:27:46 -07:00
parent 63a3a4a254
commit e6f07a832d
7 changed files with 83 additions and 61 deletions

View File

@ -20,6 +20,7 @@ package io.druid.client.cache;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;
import com.metamx.emitter.service.ServiceEmitter;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -39,6 +40,12 @@ public interface Cache
public boolean isLocal();
/**
* Custom metrics not covered by CacheStats may be emitted by this method.
* @param emitter The service emitter to emit on.
*/
public void doMonitor(ServiceEmitter emitter);
public class NamedKey
{
final public String namespace;

View File

@ -49,6 +49,9 @@ public class CacheMonitor extends AbstractMonitor
emitStats(emitter, "query/cache/total", currCacheStats, builder);
prevCacheStats = currCacheStats;
// Any custom cache statistics that need monitoring
cache.doMonitor(emitter);
return true;
}

View File

@ -20,6 +20,7 @@ package io.druid.client.cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.emitter.service.ServiceEmitter;
import java.nio.ByteBuffer;
import java.util.Collections;
@ -132,7 +133,7 @@ public class MapCache implements Cache
toRemove.add(next);
}
}
for(ByteBuffer key : toRemove) {
for (ByteBuffer key : toRemove) {
baseMap.remove(key);
}
}
@ -163,4 +164,10 @@ public class MapCache implements Cache
{
return true;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
// No special monitoring
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
@ -33,7 +34,6 @@ import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.collections.LoadBalancingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
@ -58,6 +58,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -88,11 +89,11 @@ public class MemcachedCache implements Cache
}
};
public static MemcachedCache create(final MemcachedCacheConfig config, MonitorScheduler emitter)
public static MemcachedCache create(final MemcachedCacheConfig config)
{
final ConcurrentMap<String, AtomicLong> counters = new ConcurrentHashMap<>();
final ConcurrentMap<String, AtomicLong> meters = new ConcurrentHashMap<>();
emitter.addMonitor(
final AbstractMonitor monitor =
new AbstractMonitor()
{
final AtomicReference<Map<String, Long>> priorValues = new AtomicReference<Map<String, Long>>(new HashMap<String, Long>());
@ -134,8 +135,7 @@ public class MemcachedCache implements Cache
}
return builder.build();
}
}
);
};
try {
LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize());
@ -152,12 +152,24 @@ public class MemcachedCache implements Cache
final Predicate<String> interesting = new Predicate<String>()
{
// See net.spy.memcached.MemcachedConnection.registerMetrics()
private final Set<String> interestingMetrics = ImmutableSet.of(
"[MEM] Reconnecting Nodes (ReconnectQueue)",
//"[MEM] Shutting Down Nodes (NodesToShutdown)", // Busted
"[MEM] Request Rate: All",
"[MEM] Average Bytes written to OS per write",
"[MEM] Average Bytes read from OS per read",
"[MEM] Average Time on wire for operations (µs)",
"[MEM] Response Rate: All (Failure + Success + Retry)",
"[MEM] Response Rate: Retry",
"[MEM] Response Rate: Failure",
"[MEM] Response Rate: Success"
);
@Override
public boolean apply(@Nullable String input)
{
// See net.spy.memcached.MemcachedConnection.registerMetrics()
// in current version shutdown queue metric is borked
return input != null && !input.contains("Down");
return input != null && interestingMetrics.contains(input);
}
};
@ -169,7 +181,7 @@ public class MemcachedCache implements Cache
if (!interesting.apply(name)) {
return;
}
counters.put(name, new AtomicLong(0L));
counters.putIfAbsent(name, new AtomicLong(0L));
if (log.isDebugEnabled()) {
log.debug("Add Counter [%s]", name);
@ -179,13 +191,8 @@ public class MemcachedCache implements Cache
@Override
public void removeCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
counters.remove(name);
if (log.isDebugEnabled()) {
log.debug("Remove Counter [%s]", name);
log.debug("Ignoring request to remove [%s]", name);
}
}
@ -264,7 +271,10 @@ public class MemcachedCache implements Cache
@Override
public void addMeter(String name)
{
meters.put(name, new AtomicLong(0L));
if (!interesting.apply(name)) {
return;
}
meters.putIfAbsent(name, new AtomicLong(0L));
if (log.isDebugEnabled()) {
log.debug("Adding meter [%s]", name);
}
@ -273,15 +283,20 @@ public class MemcachedCache implements Cache
@Override
public void removeMeter(String name)
{
meters.remove(name);
if (!interesting.apply(name)) {
return;
}
if (log.isDebugEnabled()) {
log.debug("Removing meter [%s]", name);
log.debug("Ignoring request to remove meter [%s]", name);
}
}
@Override
public void markMeter(String name)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong meter = meters.get(name);
if (meter == null) {
meters.putIfAbsent(name, new AtomicLong(0L));
@ -360,7 +375,7 @@ public class MemcachedCache implements Cache
);
}
return new MemcachedCache(clientSupplier, config);
return new MemcachedCache(clientSupplier, config, monitor);
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -377,9 +392,14 @@ public class MemcachedCache implements Cache
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AbstractMonitor monitor;
MemcachedCache(Supplier<ResourceHolder<MemcachedClientIF>> client, MemcachedCacheConfig config)
MemcachedCache(
Supplier<ResourceHolder<MemcachedClientIF>> client,
MemcachedCacheConfig config,
AbstractMonitor monitor
)
{
Preconditions.checkArgument(
config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
@ -387,6 +407,7 @@ public class MemcachedCache implements Cache
config.getMemcachedPrefix().length(),
MAX_PREFIX_LENGTH
);
this.monitor = monitor;
this.timeout = config.getTimeout();
this.expiration = config.getExpiration();
this.client = client;
@ -587,4 +608,10 @@ public class MemcachedCache implements Cache
{
return false;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
monitor.doMonitor(emitter);
}
}

View File

@ -17,26 +17,11 @@
package io.druid.client.cache;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.metamx.metrics.MonitorScheduler;
public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider
{
private final MonitorScheduler emitter;
@JsonCreator
public MemcachedCacheProvider(
@JacksonInject
MonitorScheduler emitter
)
{
this.emitter = emitter;
}
@Override
public Cache get()
{
return MemcachedCache.create(this, emitter);
return MemcachedCache.create(this);
}
}

View File

@ -102,7 +102,7 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
{
return 3600;
}
}
}, MemcachedCacheTest.NOOP_MONITOR
);
randBytes = new byte[objectSize * 1024];

View File

@ -33,7 +33,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
@ -56,7 +56,6 @@ import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
@ -84,6 +83,14 @@ public class MemcachedCacheTest
private static final Logger log = new Logger(MemcachedCacheTest.class);
private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
private static final byte[] HO = "hooooooooooooooooooo".getBytes();
protected static final AbstractMonitor NOOP_MONITOR = new AbstractMonitor()
{
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
return false;
}
};
private MemcachedCache cache;
private final MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig()
{
@ -119,7 +126,8 @@ public class MemcachedCacheTest
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
StupidResourceHolder.<MemcachedClientIF>create(new MockMemcachedClient())
),
memcachedCacheConfig
memcachedCacheConfig,
NOOP_MONITOR
);
}
@ -191,20 +199,8 @@ public class MemcachedCacheTest
@Test
public void testMonitor() throws Exception
{
MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class);
Capture<? extends Monitor> monitorCapture = Capture.newInstance();
monitorScheduler.addMonitor(EasyMock.capture(monitorCapture));
EasyMock.expectLastCall().once();
EasyMock.replay(monitorScheduler);
MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig, monitorScheduler);
EasyMock.verify(monitorScheduler);
Assert.assertTrue(monitorCapture.hasCaptured());
final Monitor monitor = monitorCapture.getValue();
monitor.start();
Assert.assertNotNull(monitor);
Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig);
final Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final Collection<Event> events = new ArrayList<>();
final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter)
{
@ -217,7 +213,7 @@ public class MemcachedCacheTest
while (events.isEmpty()) {
Thread.sleep(memcachedCacheConfig.getTimeout());
Assert.assertTrue(monitor.monitor(serviceEmitter));
cache.doMonitor(serviceEmitter);
}
Assert.assertFalse(events.isEmpty());
@ -288,20 +284,17 @@ public class MemcachedCacheTest
class MemcachedProviderWithConfig extends MemcachedCacheProvider
{
private final MemcachedCacheConfig config;
private final MonitorScheduler emitter;
@Inject
public MemcachedProviderWithConfig(MonitorScheduler emitter, MemcachedCacheConfig config)
public MemcachedProviderWithConfig(MemcachedCacheConfig config)
{
super(emitter);
this.emitter = emitter;
this.config = config;
}
@Override
public Cache get()
{
return MemcachedCache.create(config, emitter);
return MemcachedCache.create(config);
}
}