Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-03-07 15:55:38 -06:00
commit e4de6f3deb
8 changed files with 71 additions and 17 deletions

View File

@ -248,7 +248,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
{ {
if (requestLogger == null) { if (requestLogger == null) {
try { try {
setRequestLogger(Initialization.makeRequestLogger(getScheduledExecutorFactory(), getProps())); setRequestLogger(Initialization.makeRequestLogger(getJsonMapper(), getScheduledExecutorFactory(), getProps()));
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -67,5 +67,6 @@ public class CacheMonitor extends AbstractMonitor
emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate())); emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate()));
emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes())); emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts())); emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
emitter.emit(builder.build(String.format("%s/errors", metricPrefix), cacheStats.getNumErrors()));
} }
} }

View File

@ -29,6 +29,7 @@ public class CacheStats
private final long sizeInBytes; private final long sizeInBytes;
private final long numEvictions; private final long numEvictions;
private final long numTimeouts; private final long numTimeouts;
private final long numErrors;
public CacheStats( public CacheStats(
long numHits, long numHits,
@ -36,7 +37,8 @@ public class CacheStats
long size, long size,
long sizeInBytes, long sizeInBytes,
long numEvictions, long numEvictions,
long numTimeouts long numTimeouts,
long numErrors
) )
{ {
this.numHits = numHits; this.numHits = numHits;
@ -45,6 +47,7 @@ public class CacheStats
this.sizeInBytes = sizeInBytes; this.sizeInBytes = sizeInBytes;
this.numEvictions = numEvictions; this.numEvictions = numEvictions;
this.numTimeouts = numTimeouts; this.numTimeouts = numTimeouts;
this.numErrors = numErrors;
} }
public long getNumHits() public long getNumHits()
@ -77,6 +80,11 @@ public class CacheStats
return numTimeouts; return numTimeouts;
} }
public long getNumErrors()
{
return numErrors;
}
public long numLookups() public long numLookups()
{ {
return numHits + numMisses; return numHits + numMisses;
@ -104,7 +112,8 @@ public class CacheStats
size - oldStats.size, size - oldStats.size,
sizeInBytes - oldStats.sizeInBytes, sizeInBytes - oldStats.sizeInBytes,
numEvictions - oldStats.numEvictions, numEvictions - oldStats.numEvictions,
numTimeouts - oldStats.numTimeouts numTimeouts - oldStats.numTimeouts,
numErrors - oldStats.numErrors
); );
} }
} }

View File

@ -76,6 +76,7 @@ public class MapCache implements Cache
byteCountingLRUMap.size(), byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(), byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount(), byteCountingLRUMap.getEvictionCount(),
0,
0 0
); );
} }

View File

@ -24,6 +24,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import net.spy.memcached.AddrUtil; import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm; import net.spy.memcached.DefaultHashAlgorithm;
@ -47,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class MemcachedCache implements Cache public class MemcachedCache implements Cache
{ {
private static final Logger log = new Logger(MemcachedCache.class);
public static MemcachedCache create(final MemcachedCacheConfig config) public static MemcachedCache create(final MemcachedCacheConfig config)
{ {
try { try {
@ -60,9 +65,11 @@ public class MemcachedCache implements Cache
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true) .setDaemon(true)
.setFailureMode(FailureMode.Retry) .setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder) .setTranscoder(transcoder)
.setShouldOptimize(true) .setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.build(), .build(),
AddrUtil.getAddresses(config.getHosts()) AddrUtil.getAddresses(config.getHosts())
), ),
@ -84,6 +91,7 @@ public class MemcachedCache implements Cache
private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) { MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
@ -105,14 +113,23 @@ public class MemcachedCache implements Cache
0, 0,
0, 0,
0, 0,
timeoutCount.get() timeoutCount.get(),
errorCount.get()
); );
} }
@Override @Override
public byte[] get(NamedKey key) public byte[] get(NamedKey key)
{ {
Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); Future<Object> future;
try {
future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
} catch(IllegalStateException e) {
// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
return null;
}
try { try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) { if(bytes != null) {
@ -133,14 +150,22 @@ public class MemcachedCache implements Cache
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
catch(ExecutionException e) { catch(ExecutionException e) {
throw Throwables.propagate(e); errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache");
return null;
} }
} }
@Override @Override
public void put(NamedKey key, byte[] value) public void put(NamedKey key, byte[] value)
{ {
try {
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
} catch(IllegalStateException e) {
// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
}
} }
private static byte[] serializeValue(NamedKey key, byte[] value) { private static byte[] serializeValue(NamedKey key, byte[] value) {
@ -183,7 +208,17 @@ public class MemcachedCache implements Cache
} }
); );
BulkFuture<Map<String, Object>> future = client.asyncGetBulk(keyLookup.keySet()); Map<NamedKey, byte[]> results = Maps.newHashMap();
BulkFuture<Map<String, Object>> future;
try {
future = client.asyncGetBulk(keyLookup.keySet());
} catch(IllegalStateException e) {
// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
return results;
}
try { try {
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS); Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
@ -195,7 +230,6 @@ public class MemcachedCache implements Cache
missCount.addAndGet(keyLookup.size() - some.size()); missCount.addAndGet(keyLookup.size() - some.size());
hitCount.addAndGet(some.size()); hitCount.addAndGet(some.size());
Map<NamedKey, byte[]> results = Maps.newHashMap();
for(Map.Entry<String, Object> entry : some.entrySet()) { for(Map.Entry<String, Object> entry : some.entrySet()) {
final NamedKey key = keyLookup.get(entry.getKey()); final NamedKey key = keyLookup.get(entry.getKey());
final byte[] value = (byte[]) entry.getValue(); final byte[] value = (byte[]) entry.getValue();
@ -212,7 +246,9 @@ public class MemcachedCache implements Cache
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
catch(ExecutionException e) { catch(ExecutionException e) {
throw Throwables.propagate(e); errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache");
return results;
} }
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.http; package com.metamx.druid.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -38,6 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
*/ */
public class FileRequestLogger implements RequestLogger public class FileRequestLogger implements RequestLogger
{ {
private final ObjectMapper objectMapper;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final File baseDir; private final File baseDir;
@ -46,9 +48,11 @@ public class FileRequestLogger implements RequestLogger
private volatile DateTime currentDay; private volatile DateTime currentDay;
private volatile FileWriter fileWriter; private volatile FileWriter fileWriter;
public FileRequestLogger(ScheduledExecutorService exec, File baseDir)
public FileRequestLogger(ObjectMapper objectMapper, ScheduledExecutorService exec, File baseDir)
{ {
this.exec = exec; this.exec = exec;
this.objectMapper = objectMapper;
this.baseDir = baseDir; this.baseDir = baseDir;
} }
@ -110,7 +114,7 @@ public class FileRequestLogger implements RequestLogger
{ {
synchronized (lock) { synchronized (lock) {
fileWriter.write( fileWriter.write(
String.format("%s%n", requestLogLine.getLine()) String.format("%s%n", requestLogLine.getLine(objectMapper))
); );
fileWriter.flush(); fileWriter.flush();
} }

View File

@ -19,6 +19,8 @@
package com.metamx.druid.http; package com.metamx.druid.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -40,13 +42,13 @@ public class RequestLogLine
this.query = query; this.query = query;
} }
public String getLine() public String getLine(ObjectMapper objectMapper) throws JsonProcessingException
{ {
return JOINER.join( return JOINER.join(
Arrays.asList( Arrays.asList(
timestamp, timestamp,
remoteAddr, remoteAddr,
query objectMapper.writeValueAsString(query)
) )
); );
} }

View File

@ -348,9 +348,10 @@ public class Initialization
return serviceProvider; return serviceProvider;
} }
public static RequestLogger makeRequestLogger(ScheduledExecutorFactory factory, Properties props) throws IOException public static RequestLogger makeRequestLogger(ObjectMapper objectMapper, ScheduledExecutorFactory factory, Properties props) throws IOException
{ {
return new FileRequestLogger( return new FileRequestLogger(
objectMapper,
factory.create(1, "RequestLogger-%s"), factory.create(1, "RequestLogger-%s"),
new File(PropUtils.getProperty(props, "druid.request.logging.dir")) new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
); );