mirror of https://github.com/apache/druid.git
log but do not fail if memcached operations do not get queued in time
This commit is contained in:
parent
552b365194
commit
8f6c313561
|
@ -24,6 +24,9 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
|
import com.metamx.emitter.service.ServiceEventBuilder;
|
||||||
import net.spy.memcached.AddrUtil;
|
import net.spy.memcached.AddrUtil;
|
||||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
import net.spy.memcached.DefaultHashAlgorithm;
|
import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
|
@ -47,6 +50,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class MemcachedCache implements Cache
|
public class MemcachedCache implements Cache
|
||||||
{
|
{
|
||||||
|
private static final Logger log = new Logger(MemcachedCache.class);
|
||||||
|
|
||||||
public static MemcachedCache create(final MemcachedCacheConfig config)
|
public static MemcachedCache create(final MemcachedCacheConfig config)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
@ -60,9 +65,11 @@ public class MemcachedCache implements Cache
|
||||||
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
|
||||||
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.setFailureMode(FailureMode.Retry)
|
.setFailureMode(FailureMode.Cancel)
|
||||||
.setTranscoder(transcoder)
|
.setTranscoder(transcoder)
|
||||||
.setShouldOptimize(true)
|
.setShouldOptimize(true)
|
||||||
|
.setOpQueueMaxBlockTime(config.getTimeout())
|
||||||
|
.setOpTimeout(config.getTimeout())
|
||||||
.build(),
|
.build(),
|
||||||
AddrUtil.getAddresses(config.getHosts())
|
AddrUtil.getAddresses(config.getHosts())
|
||||||
),
|
),
|
||||||
|
@ -112,7 +119,14 @@ public class MemcachedCache implements Cache
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(NamedKey key)
|
public byte[] get(NamedKey key)
|
||||||
{
|
{
|
||||||
Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
Future<Object> future;
|
||||||
|
try {
|
||||||
|
future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||||
|
} catch(IllegalStateException e) {
|
||||||
|
// operation did not get queued in time (queue is full)
|
||||||
|
log.warn(e, "Unable to queue cache operation");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
if(bytes != null) {
|
if(bytes != null) {
|
||||||
|
@ -140,7 +154,12 @@ public class MemcachedCache implements Cache
|
||||||
@Override
|
@Override
|
||||||
public void put(NamedKey key, byte[] value)
|
public void put(NamedKey key, byte[] value)
|
||||||
{
|
{
|
||||||
|
try {
|
||||||
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
||||||
|
} catch(IllegalStateException e) {
|
||||||
|
// operation did not get queued in time (queue is full)
|
||||||
|
log.warn(e, "Unable to queue cache operation");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeValue(NamedKey key, byte[] value) {
|
private static byte[] serializeValue(NamedKey key, byte[] value) {
|
||||||
|
@ -183,7 +202,17 @@ public class MemcachedCache implements Cache
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
BulkFuture<Map<String, Object>> future = client.asyncGetBulk(keyLookup.keySet());
|
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
||||||
|
|
||||||
|
BulkFuture<Map<String, Object>> future;
|
||||||
|
try {
|
||||||
|
future = client.asyncGetBulk(keyLookup.keySet());
|
||||||
|
} catch(IllegalStateException e) {
|
||||||
|
timeoutCount.incrementAndGet();
|
||||||
|
// operation did not get queued in time (queue is full)
|
||||||
|
log.warn(e, "Unable to queue cache operation");
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
|
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
@ -195,7 +224,6 @@ public class MemcachedCache implements Cache
|
||||||
missCount.addAndGet(keyLookup.size() - some.size());
|
missCount.addAndGet(keyLookup.size() - some.size());
|
||||||
hitCount.addAndGet(some.size());
|
hitCount.addAndGet(some.size());
|
||||||
|
|
||||||
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
|
||||||
for(Map.Entry<String, Object> entry : some.entrySet()) {
|
for(Map.Entry<String, Object> entry : some.entrySet()) {
|
||||||
final NamedKey key = keyLookup.get(entry.getKey());
|
final NamedKey key = keyLookup.get(entry.getKey());
|
||||||
final byte[] value = (byte[]) entry.getValue();
|
final byte[] value = (byte[]) entry.getValue();
|
||||||
|
|
Loading…
Reference in New Issue