Merge pull request #1688 from metamx/moreMemcachedMetrics

More memcached metrics
This commit is contained in:
Xavier Léauté 2015-09-15 15:33:51 -07:00
commit b464da438c
4 changed files with 450 additions and 71 deletions

View File

@ -72,6 +72,14 @@ Available Metrics
|`*/timeouts`|Number of cache timeouts.||0| |`*/timeouts`|Number of cache timeouts.||0|
|`*/errors`|Number of cache errors.||0| |`*/errors`|Number of cache errors.||0|
#### Memcached only metrics
Memcached client metrics are reported as per the following. These metrics come directly from the client as opposed to from the cache retrieval layer.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/cache/memcached/total`|Cache metrics unique to memcached (only if `druid.cache.type=memcached`) as their actual values|Variable|N/A|
|`query/cache/memcached/delta`|Cache metrics unique to memcached (only if `druid.cache.type=memcached`) as their delta from the prior event emission|Variable|N/A|
## Ingestion Metrics ## Ingestion Metrics
|Metric|Description|Dimensions|Normal Value| |Metric|Description|Dimensions|Normal Value|

View File

@ -20,12 +20,18 @@ package io.druid.client.cache;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorScheduler;
import net.spy.memcached.AddrUtil; import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode; import net.spy.memcached.FailureMode;
@ -33,6 +39,8 @@ import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.LinkedOperationQueueFactory; import net.spy.memcached.ops.LinkedOperationQueueFactory;
import net.spy.memcached.ops.OperationQueueFactory; import net.spy.memcached.ops.OperationQueueFactory;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
@ -41,12 +49,16 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class MemcachedCache implements Cache public class MemcachedCache implements Cache
{ {
@ -69,8 +81,54 @@ public class MemcachedCache implements Cache
} }
}; };
public static MemcachedCache create(final MemcachedCacheConfig config) public static MemcachedCache create(final MemcachedCacheConfig config, MonitorScheduler emitter)
{ {
final ConcurrentMap<String, AtomicLong> counters = new ConcurrentHashMap<>();
final ConcurrentMap<String, AtomicLong> meters = new ConcurrentHashMap<>();
emitter.addMonitor(
new AbstractMonitor()
{
final AtomicReference<Map<String, Long>> priorValues = new AtomicReference<Map<String, Long>>(new HashMap<String, Long>());
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final Map<String, Long> priorValues = this.priorValues.get();
final Map<String, Long> currentValues = getCurrentValues();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
for (Map.Entry<String, Long> entry : currentValues.entrySet()) {
emitter.emit(
builder.setDimension("memcached metric", entry.getKey())
.build("query/cache/memcached/total", entry.getValue())
);
final Long prior = priorValues.get(entry.getKey());
if (prior != null) {
emitter.emit(
builder.setDimension("memcached metric", entry.getKey()).build(
"query/cache/memcached/delta",
entry.getValue() - prior
)
);
}
}
if (!this.priorValues.compareAndSet(priorValues, currentValues)) {
log.error("Prior value changed while I was reporting! updating anyways");
this.priorValues.set(currentValues);
}
return true;
}
private Map<String, Long> getCurrentValues()
{
final ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
for (Map.Entry<String, AtomicLong> entry : counters.entrySet()) {
builder.put(entry.getKey(), entry.getValue().get());
}
return builder.build();
}
}
);
try { try {
LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize()); LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize());
@ -79,35 +137,203 @@ public class MemcachedCache implements Cache
OperationQueueFactory opQueueFactory; OperationQueueFactory opQueueFactory;
long maxQueueBytes = config.getMaxOperationQueueSize(); long maxQueueBytes = config.getMaxOperationQueueSize();
if(maxQueueBytes > 0) { if (maxQueueBytes > 0) {
opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes); opQueueFactory = new MemcachedOperationQueueFactory(maxQueueBytes);
} else { } else {
opQueueFactory = new LinkedOperationQueueFactory(); opQueueFactory = new LinkedOperationQueueFactory();
} }
final Predicate<String> interesting = new Predicate<String>()
{
@Override
public boolean apply(@Nullable String input)
{
// See net.spy.memcached.MemcachedConnection.registerMetrics()
// in current version shutdown queue metric is borked
return input != null && !input.contains("Down");
}
};
return new MemcachedCache( return new MemcachedCache(
new MemcachedClient( new MemcachedClient(
new MemcachedCustomConnectionFactoryBuilder() new MemcachedCustomConnectionFactoryBuilder()
// 1000 repetitions gives us good distribution with murmur3_128 // 1000 repetitions gives us good distribution with murmur3_128
// (approx < 5% difference in counts across nodes, with 5 cache nodes) // (approx < 5% difference in counts across nodes, with 5 cache nodes)
.setKetamaNodeRepetitions(1000) .setKetamaNodeRepetitions(1000)
.setHashAlg(MURMUR3_128) .setHashAlg(MURMUR3_128)
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true) .setDaemon(true)
.setFailureMode(FailureMode.Cancel) .setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder) .setTranscoder(transcoder)
.setShouldOptimize(true) .setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout()) .setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout()) .setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize()) .setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory) .setOpQueueFactory(opQueueFactory)
.build(), .setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds
AddrUtil.getAddresses(config.getHosts()) .setWriteOpQueueFactory(opQueueFactory)
), .setReadOpQueueFactory(opQueueFactory)
config .setMetricCollector(
new MetricCollector()
{
@Override
public void addCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
counters.put(name, new AtomicLong(0L));
if (log.isDebugEnabled()) {
log.debug("Add Counter [%s]", name);
}
}
@Override
public void removeCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
counters.remove(name);
if (log.isDebugEnabled()) {
log.debug("Remove Counter [%s]", name);
}
}
@Override
public void incrementCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong counter = counters.get(name);
if (counter == null) {
counters.putIfAbsent(name, new AtomicLong(0));
counter = counters.get(name);
}
counter.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Incrament [%s]", name);
}
}
@Override
public void incrementCounter(String name, int amount)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong counter = counters.get(name);
if (counter == null) {
counters.putIfAbsent(name, new AtomicLong(0));
counter = counters.get(name);
}
counter.addAndGet(amount);
if (log.isDebugEnabled()) {
log.debug("Increment [%s] %d", name, amount);
}
}
@Override
public void decrementCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong counter = counters.get(name);
if (counter == null) {
counters.putIfAbsent(name, new AtomicLong(0));
counter = counters.get(name);
}
counter.decrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Decrement [%s]", name);
}
}
@Override
public void decrementCounter(String name, int amount)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong counter = counters.get(name);
if (counter == null) {
counters.putIfAbsent(name, new AtomicLong(0L));
counter = counters.get(name);
}
counter.addAndGet(-amount);
if (log.isDebugEnabled()) {
log.debug("Decrement [%s] %d", name, amount);
}
}
@Override
public void addMeter(String name)
{
meters.put(name, new AtomicLong(0L));
if (log.isDebugEnabled()) {
log.debug("Adding meter [%s]", name);
}
}
@Override
public void removeMeter(String name)
{
meters.remove(name);
if (log.isDebugEnabled()) {
log.debug("Removing meter [%s]", name);
}
}
@Override
public void markMeter(String name)
{
AtomicLong meter = meters.get(name);
if (meter == null) {
meters.putIfAbsent(name, new AtomicLong(0L));
meter = meters.get(name);
}
meter.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Increment counter [%s]", name);
}
}
@Override
public void addHistogram(String name)
{
log.debug("Ignoring add histogram [%s]", name);
}
@Override
public void removeHistogram(String name)
{
log.debug("Ignoring remove histogram [%s]", name);
}
@Override
public void updateHistogram(String name, int amount)
{
log.debug("Ignoring update histogram [%s]: %d", name, amount);
}
}
)
.build(),
AddrUtil.getAddresses(config.getHosts())
),
config
); );
} catch(IOException e) { }
catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
@ -124,11 +350,14 @@ public class MemcachedCache implements Cache
private final AtomicLong errorCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0);
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) { MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config)
Preconditions.checkArgument(config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, {
"memcachedPrefix length [%d] exceeds maximum length [%d]", Preconditions.checkArgument(
config.getMemcachedPrefix().length(), config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
MAX_PREFIX_LENGTH); "memcachedPrefix length [%d] exceeds maximum length [%d]",
config.getMemcachedPrefix().length(),
MAX_PREFIX_LENGTH
);
this.timeout = config.getTimeout(); this.timeout = config.getTimeout();
this.expiration = config.getExpiration(); this.expiration = config.getExpiration();
this.client = client; this.client = client;
@ -155,7 +384,8 @@ public class MemcachedCache implements Cache
Future<Object> future; Future<Object> future;
try { try {
future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
} catch(IllegalStateException e) { }
catch (IllegalStateException e) {
// operation did not get queued in time (queue is full) // operation did not get queued in time (queue is full)
errorCount.incrementAndGet(); errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation"); log.warn(e, "Unable to queue cache operation");
@ -163,24 +393,23 @@ public class MemcachedCache implements Cache
} }
try { try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) { if (bytes != null) {
hitCount.incrementAndGet(); hitCount.incrementAndGet();
} } else {
else {
missCount.incrementAndGet(); missCount.incrementAndGet();
} }
return bytes == null ? null : deserializeValue(key, bytes); return bytes == null ? null : deserializeValue(key, bytes);
} }
catch(TimeoutException e) { catch (TimeoutException e) {
timeoutCount.incrementAndGet(); timeoutCount.incrementAndGet();
future.cancel(false); future.cancel(false);
return null; return null;
} }
catch(InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
catch(ExecutionException e) { catch (ExecutionException e) {
errorCount.incrementAndGet(); errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache"); log.warn(e, "Exception pulling item from cache");
return null; return null;
@ -192,14 +421,16 @@ public class MemcachedCache implements Cache
{ {
try { try {
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
} catch(IllegalStateException e) { }
catch (IllegalStateException e) {
// operation did not get queued in time (queue is full) // operation did not get queued in time (queue is full)
errorCount.incrementAndGet(); errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation"); log.warn(e, "Unable to queue cache operation");
} }
} }
private static byte[] serializeValue(NamedKey key, byte[] value) { private static byte[] serializeValue(NamedKey key, byte[] value)
{
byte[] keyBytes = key.toByteArray(); byte[] keyBytes = key.toByteArray();
return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length) return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length)
.putInt(keyBytes.length) .putInt(keyBytes.length)
@ -208,7 +439,8 @@ public class MemcachedCache implements Cache
.array(); .array();
} }
private static byte[] deserializeValue(NamedKey key, byte[] bytes) { private static byte[] deserializeValue(NamedKey key, byte[] bytes)
{
ByteBuffer buf = ByteBuffer.wrap(bytes); ByteBuffer buf = ByteBuffer.wrap(bytes);
final int keyLength = buf.getInt(); final int keyLength = buf.getInt();
@ -217,8 +449,10 @@ public class MemcachedCache implements Cache
byte[] value = new byte[buf.remaining()]; byte[] value = new byte[buf.remaining()];
buf.get(value); buf.get(value);
Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()), Preconditions.checkState(
"Keys do not match, possible hash collision?"); Arrays.equals(keyBytes, key.toByteArray()),
"Keys do not match, possible hash collision?"
);
return value; return value;
} }
@ -244,7 +478,8 @@ public class MemcachedCache implements Cache
BulkFuture<Map<String, Object>> future; BulkFuture<Map<String, Object>> future;
try { try {
future = client.asyncGetBulk(keyLookup.keySet()); future = client.asyncGetBulk(keyLookup.keySet());
} catch(IllegalStateException e) { }
catch (IllegalStateException e) {
// operation did not get queued in time (queue is full) // operation did not get queued in time (queue is full)
errorCount.incrementAndGet(); errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation"); log.warn(e, "Unable to queue cache operation");
@ -254,14 +489,14 @@ public class MemcachedCache implements Cache
try { try {
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS); Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
if(future.isTimeout()) { if (future.isTimeout()) {
future.cancel(false); future.cancel(false);
timeoutCount.incrementAndGet(); timeoutCount.incrementAndGet();
} }
missCount.addAndGet(keyLookup.size() - some.size()); missCount.addAndGet(keyLookup.size() - some.size());
hitCount.addAndGet(some.size()); hitCount.addAndGet(some.size());
for(Map.Entry<String, Object> entry : some.entrySet()) { for (Map.Entry<String, Object> entry : some.entrySet()) {
final NamedKey key = keyLookup.get(entry.getKey()); final NamedKey key = keyLookup.get(entry.getKey());
final byte[] value = (byte[]) entry.getValue(); final byte[] value = (byte[]) entry.getValue();
results.put( results.put(
@ -272,11 +507,11 @@ public class MemcachedCache implements Cache
return results; return results;
} }
catch(InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
catch(ExecutionException e) { catch (ExecutionException e) {
errorCount.incrementAndGet(); errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache"); log.warn(e, "Exception pulling item from cache");
return results; return results;
@ -290,18 +525,20 @@ public class MemcachedCache implements Cache
} }
public static final int MAX_PREFIX_LENGTH = public static final int MAX_PREFIX_LENGTH =
MemcachedClientIF.MAX_KEY_LENGTH MemcachedClientIF.MAX_KEY_LENGTH
- 40 // length of namespace hash - 40 // length of namespace hash
- 40 // length of key hash - 40 // length of key hash
- 2 // length of separators - 2 // length of separators
; ;
private static String computeKeyHash(String memcachedPrefix, NamedKey key) { private static String computeKeyHash(String memcachedPrefix, NamedKey key)
{
// hash keys to keep things under 250 characters for memcached // hash keys to keep things under 250 characters for memcached
return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key); return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key);
} }
public boolean isLocal() { public boolean isLocal()
{
return false; return false;
} }
} }

View File

@ -17,11 +17,22 @@
package io.druid.client.cache; package io.druid.client.cache;
import com.google.inject.Inject;
import com.metamx.metrics.MonitorScheduler;
public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider
{ {
private final MonitorScheduler emitter;
@Inject
public MemcachedCacheProvider(MonitorScheduler emitter)
{
this.emitter = emitter;
}
@Override @Override
public Cache get() public Cache get()
{ {
return MemcachedCache.create(this); return MemcachedCache.create(this, emitter);
} }
} }

View File

@ -17,9 +17,27 @@
package io.druid.client.cache; package io.druid.client.cache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.ManageLifecycle;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import net.spy.memcached.BroadcastOpFactory; import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASResponse; import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue; import net.spy.memcached.CASValue;
@ -34,11 +52,14 @@ import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.transcoders.Transcoder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -55,37 +76,119 @@ import java.util.concurrent.TimeoutException;
*/ */
public class MemcachedCacheTest public class MemcachedCacheTest
{ {
private static final Logger log = new Logger(MemcachedCacheTest.class);
private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes(); private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
private static final byte[] HO = "hooooooooooooooooooo".getBytes(); private static final byte[] HO = "hooooooooooooooooooo".getBytes();
private MemcachedCache cache; private MemcachedCache cache;
private final MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig()
{
@Override
public String getMemcachedPrefix()
{
return "druid-memcached-test";
}
@Override
public int getTimeout()
{
return 10;
}
@Override
public int getExpiration()
{
return 3600;
}
@Override
public String getHosts()
{
return "localhost:9999";
}
};
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
MemcachedClientIF client = new MockMemcachedClient(); MemcachedClientIF client = new MockMemcachedClient();
cache = new MemcachedCache( cache = new MemcachedCache(client, memcachedCacheConfig);
client, new MemcachedCacheConfig() }
@Test
public void testBasicInjection() throws Exception
{
final MemcachedCacheConfig config = new MemcachedCacheConfig()
{ {
@Override @Override
public String getMemcachedPrefix() public String getHosts()
{ {
return "druid-memcached-test"; return "127.0.0.1:22";
} }
};
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/memcached");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
@Override binder.bind(MemcachedCacheConfig.class).toInstance(config);
public int getTimeout() binder.bind(Cache.class).toProvider(MemcachedProviderWithConfig.class).in(ManageLifecycle.class);
{ }
return 500; }
} )
@Override
public int getExpiration()
{
return 3600;
}
}
); );
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
try {
Cache cache = injector.getInstance(Cache.class);
Assert.assertEquals(MemcachedCache.class, cache.getClass());
}
finally {
lifecycle.stop();
}
}
@Test
public void testMonitor() throws Exception
{
MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class);
Capture<? extends Monitor> monitorCapture = Capture.newInstance();
monitorScheduler.addMonitor(EasyMock.capture(monitorCapture));
EasyMock.expectLastCall().once();
EasyMock.replay(monitorScheduler);
MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig, monitorScheduler);
EasyMock.verify(monitorScheduler);
Assert.assertTrue(monitorCapture.hasCaptured());
final Monitor monitor = monitorCapture.getValue();
monitor.start();
Assert.assertNotNull(monitor);
Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final Collection<Event> events = new ArrayList<>();
final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter)
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
while (events.isEmpty()) {
Thread.sleep(memcachedCacheConfig.getTimeout());
Assert.assertTrue(monitor.monitor(serviceEmitter));
}
Assert.assertFalse(events.isEmpty());
ObjectMapper mapper = new DefaultObjectMapper();
for (Event event : events) {
log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap()));
}
} }
@Test @Test
@ -146,6 +249,26 @@ public class MemcachedCacheTest
} }
} }
class MemcachedProviderWithConfig extends MemcachedCacheProvider
{
private final MemcachedCacheConfig config;
private final MonitorScheduler emitter;
@Inject
public MemcachedProviderWithConfig(MonitorScheduler emitter, MemcachedCacheConfig config)
{
super(emitter);
this.emitter = emitter;
this.config = config;
}
@Override
public Cache get()
{
return MemcachedCache.create(config, emitter);
}
}
class MockMemcachedClient implements MemcachedClientIF class MockMemcachedClient implements MemcachedClientIF
{ {
private final ConcurrentMap<String, CachedData> theMap = new ConcurrentHashMap<String, CachedData>(); private final ConcurrentMap<String, CachedData> theMap = new ConcurrentHashMap<String, CachedData>();
@ -864,4 +987,4 @@ class MockMemcachedClient implements MemcachedClientIF
{ {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
}; }