mirror of https://github.com/apache/druid.git
balance memcached cache across multiple connections
This commit is contained in:
parent
1f897257b5
commit
df4c8a3aa5
|
@ -109,3 +109,4 @@ You can optionally only configure caching to be enabled on the broker by setting
|
|||
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
||||
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
||||
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
||||
|`druid.cache.numConnections`|Number of memcached connections to use.|1|
|
||||
|
|
|
@ -21,6 +21,8 @@ import com.google.common.base.Charsets;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -32,7 +34,11 @@ import com.metamx.emitter.service.ServiceEmitter;
|
|||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.metrics.AbstractMonitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.collections.LoadBalancingPool;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidResourceHolder;
|
||||
import net.spy.memcached.AddrUtil;
|
||||
import net.spy.memcached.ConnectionFactory;
|
||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||
import net.spy.memcached.FailureMode;
|
||||
import net.spy.memcached.HashAlgorithm;
|
||||
|
@ -40,16 +46,17 @@ import net.spy.memcached.MemcachedClient;
|
|||
import net.spy.memcached.MemcachedClientIF;
|
||||
import net.spy.memcached.internal.BulkFuture;
|
||||
import net.spy.memcached.metrics.MetricCollector;
|
||||
import net.spy.memcached.metrics.MetricType;
|
||||
import net.spy.memcached.ops.LinkedOperationQueueFactory;
|
||||
import net.spy.memcached.ops.OperationQueueFactory;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -154,28 +161,7 @@ public class MemcachedCache implements Cache
|
|||
}
|
||||
};
|
||||
|
||||
return new MemcachedCache(
|
||||
new MemcachedClient(
|
||||
new MemcachedCustomConnectionFactoryBuilder()
|
||||
// 1000 repetitions gives us good distribution with murmur3_128
|
||||
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
|
||||
.setKetamaNodeRepetitions(1000)
|
||||
.setHashAlg(MURMUR3_128)
|
||||
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||
.setDaemon(true)
|
||||
.setFailureMode(FailureMode.Cancel)
|
||||
.setTranscoder(transcoder)
|
||||
.setShouldOptimize(true)
|
||||
.setOpQueueMaxBlockTime(config.getTimeout())
|
||||
.setOpTimeout(config.getTimeout())
|
||||
.setReadBufferSize(config.getReadBufferSize())
|
||||
.setOpQueueFactory(opQueueFactory)
|
||||
.setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds
|
||||
.setWriteOpQueueFactory(opQueueFactory)
|
||||
.setReadOpQueueFactory(opQueueFactory)
|
||||
.setMetricCollector(
|
||||
new MetricCollector()
|
||||
final MetricCollector metricCollector = new MetricCollector()
|
||||
{
|
||||
@Override
|
||||
public void addCounter(String name)
|
||||
|
@ -217,7 +203,7 @@ public class MemcachedCache implements Cache
|
|||
counter.incrementAndGet();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Incrament [%s]", name);
|
||||
log.debug("Increment [%s]", name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -325,13 +311,56 @@ public class MemcachedCache implements Cache
|
|||
{
|
||||
log.debug("Ignoring update histogram [%s]: %d", name, amount);
|
||||
}
|
||||
};
|
||||
|
||||
final ConnectionFactory connectionFactory = new MemcachedCustomConnectionFactoryBuilder()
|
||||
// 1000 repetitions gives us good distribution with murmur3_128
|
||||
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
|
||||
.setKetamaNodeRepetitions(1000)
|
||||
.setHashAlg(MURMUR3_128)
|
||||
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||
.setDaemon(true)
|
||||
.setFailureMode(FailureMode.Cancel)
|
||||
.setTranscoder(transcoder)
|
||||
.setShouldOptimize(true)
|
||||
.setOpQueueMaxBlockTime(config.getTimeout())
|
||||
.setOpTimeout(config.getTimeout())
|
||||
.setReadBufferSize(config.getReadBufferSize())
|
||||
.setOpQueueFactory(opQueueFactory)
|
||||
.setMetricCollector(metricCollector)
|
||||
.build();
|
||||
|
||||
final List<InetSocketAddress> hosts = AddrUtil.getAddresses(config.getHosts());
|
||||
|
||||
|
||||
final Supplier<ResourceHolder<MemcachedClientIF>> clientSupplier;
|
||||
|
||||
if (config.getNumConnections() > 1) {
|
||||
clientSupplier = new LoadBalancingPool<MemcachedClientIF>(
|
||||
config.getNumConnections(),
|
||||
new Supplier<MemcachedClientIF>()
|
||||
{
|
||||
@Override
|
||||
public MemcachedClientIF get()
|
||||
{
|
||||
try {
|
||||
return new MemcachedClient(connectionFactory, hosts);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error(e, "Unable to create memcached client");
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
.build(),
|
||||
AddrUtil.getAddresses(config.getHosts())
|
||||
),
|
||||
config
|
||||
);
|
||||
} else {
|
||||
clientSupplier = Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||
StupidResourceHolder.<MemcachedClientIF>create(new MemcachedClient(connectionFactory, hosts))
|
||||
);
|
||||
}
|
||||
|
||||
return new MemcachedCache(clientSupplier, config);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -342,7 +371,7 @@ public class MemcachedCache implements Cache
|
|||
private final int expiration;
|
||||
private final String memcachedPrefix;
|
||||
|
||||
private final MemcachedClientIF client;
|
||||
private final Supplier<ResourceHolder<MemcachedClientIF>> client;
|
||||
|
||||
private final AtomicLong hitCount = new AtomicLong(0);
|
||||
private final AtomicLong missCount = new AtomicLong(0);
|
||||
|
@ -350,7 +379,7 @@ public class MemcachedCache implements Cache
|
|||
private final AtomicLong errorCount = new AtomicLong(0);
|
||||
|
||||
|
||||
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config)
|
||||
MemcachedCache(Supplier<ResourceHolder<MemcachedClientIF>> client, MemcachedCacheConfig config)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
||||
|
@ -381,9 +410,10 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public byte[] get(NamedKey key)
|
||||
{
|
||||
try (ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||
Future<Object> future;
|
||||
try {
|
||||
future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||
future = clientHolder.get().asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
// operation did not get queued in time (queue is full)
|
||||
|
@ -415,18 +445,29 @@ public class MemcachedCache implements Cache
|
|||
return null;
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(NamedKey key, byte[] value)
|
||||
{
|
||||
try {
|
||||
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
||||
try (final ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||
clientHolder.get().set(
|
||||
computeKeyHash(memcachedPrefix, key),
|
||||
expiration,
|
||||
serializeValue(key, value)
|
||||
);
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
// operation did not get queued in time (queue is full)
|
||||
errorCount.incrementAndGet();
|
||||
log.warn(e, "Unable to queue cache operation");
|
||||
}
|
||||
catch (IOException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] serializeValue(NamedKey key, byte[] value)
|
||||
|
@ -459,6 +500,7 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||
{
|
||||
try (ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||
Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
|
||||
keys,
|
||||
new Function<NamedKey, String>()
|
||||
|
@ -477,7 +519,7 @@ public class MemcachedCache implements Cache
|
|||
|
||||
BulkFuture<Map<String, Object>> future;
|
||||
try {
|
||||
future = client.asyncGetBulk(keyLookup.keySet());
|
||||
future = clientHolder.get().asyncGetBulk(keyLookup.keySet());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
// operation did not get queued in time (queue is full)
|
||||
|
@ -517,6 +559,10 @@ public class MemcachedCache implements Cache
|
|||
return results;
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(String namespace)
|
||||
|
|
|
@ -51,6 +51,10 @@ public class MemcachedCacheConfig
|
|||
@JsonProperty
|
||||
private long maxOperationQueueSize = 0;
|
||||
|
||||
// size of memcached connection pool
|
||||
@JsonProperty
|
||||
private int numConnections = 1;
|
||||
|
||||
public int getExpiration()
|
||||
{
|
||||
return expiration;
|
||||
|
@ -85,4 +89,9 @@ public class MemcachedCacheConfig
|
|||
{
|
||||
return readBufferSize;
|
||||
}
|
||||
|
||||
public int getNumConnections()
|
||||
{
|
||||
return numConnections;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,10 @@ package io.druid.client.cache;
|
|||
import com.google.caliper.Param;
|
||||
import com.google.caliper.Runner;
|
||||
import com.google.caliper.SimpleBenchmark;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidResourceHolder;
|
||||
import net.spy.memcached.AddrUtil;
|
||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||
import net.spy.memcached.DefaultHashAlgorithm;
|
||||
|
@ -77,7 +80,9 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||
|
||||
|
||||
cache = new MemcachedCache(
|
||||
client,
|
||||
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||
StupidResourceHolder.create(client)
|
||||
),
|
||||
new MemcachedCacheConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package io.druid.client.cache;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -34,6 +35,8 @@ import com.metamx.emitter.core.Event;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.Monitor;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidResourceHolder;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.initialization.Initialization;
|
||||
|
@ -110,8 +113,12 @@ public class MemcachedCacheTest
|
|||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
MemcachedClientIF client = new MockMemcachedClient();
|
||||
cache = new MemcachedCache(client, memcachedCacheConfig);
|
||||
cache = new MemcachedCache(
|
||||
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||
StupidResourceHolder.<MemcachedClientIF>create(new MockMemcachedClient())
|
||||
),
|
||||
memcachedCacheConfig
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue