mirror of
https://github.com/apache/druid.git
synced 2025-02-17 15:35:56 +00:00
Merge pull request #1744 from metamx/memcached-connection-pooling
Memcached connection pooling
This commit is contained in:
commit
d325bb55ae
146
common/src/main/java/io/druid/collections/LoadBalancingPool.java
Normal file
146
common/src/main/java/io/druid/collections/LoadBalancingPool.java
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.collections;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple load balancing pool that always returns the least used item.
|
||||||
|
*
|
||||||
|
* An item's usage is incremented every time one gets requested from the pool
|
||||||
|
* and is decremented every time close is called on the holder.
|
||||||
|
*
|
||||||
|
* The pool eagerly instantiates all the items in the pool when created,
|
||||||
|
* using the given supplier.
|
||||||
|
*
|
||||||
|
* @param <T> type of items to pool
|
||||||
|
*/
|
||||||
|
public class LoadBalancingPool<T> implements Supplier<ResourceHolder<T>>
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(LoadBalancingPool.class);
|
||||||
|
|
||||||
|
private final Supplier<T> generator;
|
||||||
|
private final int capacity;
|
||||||
|
private final PriorityBlockingQueue<CountingHolder> queue;
|
||||||
|
|
||||||
|
public LoadBalancingPool(int capacity, Supplier<T> generator)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0");
|
||||||
|
Preconditions.checkNotNull(generator);
|
||||||
|
|
||||||
|
this.generator = generator;
|
||||||
|
this.capacity = capacity;
|
||||||
|
this.queue = new PriorityBlockingQueue<>(capacity);
|
||||||
|
|
||||||
|
// eagerly intantiate all items in the pool
|
||||||
|
init();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init() {
|
||||||
|
for(int i = 0; i < capacity; ++i) {
|
||||||
|
queue.offer(new CountingHolder(generator.get()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceHolder<T> get()
|
||||||
|
{
|
||||||
|
final CountingHolder holder;
|
||||||
|
// items never stay out of the queue for long, so we'll get one eventually
|
||||||
|
try {
|
||||||
|
holder = queue.take();
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// synchronize on item to ensure count cannot get changed by
|
||||||
|
// CountingHolder.close right after we put the item back in the queue
|
||||||
|
synchronized (holder) {
|
||||||
|
holder.count.incrementAndGet();
|
||||||
|
queue.offer(holder);
|
||||||
|
}
|
||||||
|
return holder;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class CountingHolder implements ResourceHolder<T>, Comparable<CountingHolder>
|
||||||
|
{
|
||||||
|
private AtomicInteger count = new AtomicInteger(0);
|
||||||
|
private final T object;
|
||||||
|
|
||||||
|
public CountingHolder(final T object)
|
||||||
|
{
|
||||||
|
this.object = object;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get()
|
||||||
|
{
|
||||||
|
return object;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not idempotent, should only be called once when done using the resource
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
// ensures count always gets adjusted while item is removed from the queue
|
||||||
|
synchronized (this) {
|
||||||
|
// item may not be in queue if another thread is calling LoadBalancingPool.get()
|
||||||
|
// at the same time; in that case let the other thread put it back.
|
||||||
|
boolean removed = queue.remove(this);
|
||||||
|
count.decrementAndGet();
|
||||||
|
if (removed) {
|
||||||
|
queue.offer(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(CountingHolder o)
|
||||||
|
{
|
||||||
|
return Integer.compare(count.get(), o.count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void finalize() throws Throwable
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
final int shouldBeZero = count.get();
|
||||||
|
if (shouldBeZero != 0) {
|
||||||
|
log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, object);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
super.finalize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -23,5 +23,5 @@ import java.io.Closeable;
|
|||||||
*/
|
*/
|
||||||
public interface ResourceHolder<T> extends Closeable
|
public interface ResourceHolder<T> extends Closeable
|
||||||
{
|
{
|
||||||
public T get();
|
T get();
|
||||||
}
|
}
|
||||||
|
@ -109,3 +109,4 @@ You can optionally only configure caching to be enabled on the broker by setting
|
|||||||
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
|`druid.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|
||||||
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
|`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|
||||||
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
|`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
|
||||||
|
|`druid.cache.numConnections`|Number of memcached connections to use.|1|
|
||||||
|
@ -21,6 +21,8 @@ import com.google.common.base.Charsets;
|
|||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
@ -32,7 +34,11 @@ import com.metamx.emitter.service.ServiceEmitter;
|
|||||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||||
import com.metamx.metrics.AbstractMonitor;
|
import com.metamx.metrics.AbstractMonitor;
|
||||||
import com.metamx.metrics.MonitorScheduler;
|
import com.metamx.metrics.MonitorScheduler;
|
||||||
|
import io.druid.collections.LoadBalancingPool;
|
||||||
|
import io.druid.collections.ResourceHolder;
|
||||||
|
import io.druid.collections.StupidResourceHolder;
|
||||||
import net.spy.memcached.AddrUtil;
|
import net.spy.memcached.AddrUtil;
|
||||||
|
import net.spy.memcached.ConnectionFactory;
|
||||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
import net.spy.memcached.FailureMode;
|
import net.spy.memcached.FailureMode;
|
||||||
import net.spy.memcached.HashAlgorithm;
|
import net.spy.memcached.HashAlgorithm;
|
||||||
@ -40,16 +46,17 @@ import net.spy.memcached.MemcachedClient;
|
|||||||
import net.spy.memcached.MemcachedClientIF;
|
import net.spy.memcached.MemcachedClientIF;
|
||||||
import net.spy.memcached.internal.BulkFuture;
|
import net.spy.memcached.internal.BulkFuture;
|
||||||
import net.spy.memcached.metrics.MetricCollector;
|
import net.spy.memcached.metrics.MetricCollector;
|
||||||
import net.spy.memcached.metrics.MetricType;
|
|
||||||
import net.spy.memcached.ops.LinkedOperationQueueFactory;
|
import net.spy.memcached.ops.LinkedOperationQueueFactory;
|
||||||
import net.spy.memcached.ops.OperationQueueFactory;
|
import net.spy.memcached.ops.OperationQueueFactory;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
@ -154,184 +161,206 @@ public class MemcachedCache implements Cache
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return new MemcachedCache(
|
final MetricCollector metricCollector = new MetricCollector()
|
||||||
new MemcachedClient(
|
{
|
||||||
new MemcachedCustomConnectionFactoryBuilder()
|
@Override
|
||||||
// 1000 repetitions gives us good distribution with murmur3_128
|
public void addCounter(String name)
|
||||||
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
|
{
|
||||||
.setKetamaNodeRepetitions(1000)
|
if (!interesting.apply(name)) {
|
||||||
.setHashAlg(MURMUR3_128)
|
return;
|
||||||
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
}
|
||||||
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
counters.put(name, new AtomicLong(0L));
|
||||||
.setDaemon(true)
|
|
||||||
.setFailureMode(FailureMode.Cancel)
|
|
||||||
.setTranscoder(transcoder)
|
|
||||||
.setShouldOptimize(true)
|
|
||||||
.setOpQueueMaxBlockTime(config.getTimeout())
|
|
||||||
.setOpTimeout(config.getTimeout())
|
|
||||||
.setReadBufferSize(config.getReadBufferSize())
|
|
||||||
.setOpQueueFactory(opQueueFactory)
|
|
||||||
.setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds
|
|
||||||
.setWriteOpQueueFactory(opQueueFactory)
|
|
||||||
.setReadOpQueueFactory(opQueueFactory)
|
|
||||||
.setMetricCollector(
|
|
||||||
new MetricCollector()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void addCounter(String name)
|
|
||||||
{
|
|
||||||
if (!interesting.apply(name)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
counters.put(name, new AtomicLong(0L));
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Add Counter [%s]", name);
|
log.debug("Add Counter [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeCounter(String name)
|
public void removeCounter(String name)
|
||||||
{
|
{
|
||||||
if (!interesting.apply(name)) {
|
if (!interesting.apply(name)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
counters.remove(name);
|
counters.remove(name);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Remove Counter [%s]", name);
|
log.debug("Remove Counter [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void incrementCounter(String name)
|
public void incrementCounter(String name)
|
||||||
{
|
{
|
||||||
if (!interesting.apply(name)) {
|
if (!interesting.apply(name)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
AtomicLong counter = counters.get(name);
|
AtomicLong counter = counters.get(name);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
counters.putIfAbsent(name, new AtomicLong(0));
|
counters.putIfAbsent(name, new AtomicLong(0));
|
||||||
counter = counters.get(name);
|
counter = counters.get(name);
|
||||||
}
|
}
|
||||||
counter.incrementAndGet();
|
counter.incrementAndGet();
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Incrament [%s]", name);
|
log.debug("Increment [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void incrementCounter(String name, int amount)
|
public void incrementCounter(String name, int amount)
|
||||||
{
|
{
|
||||||
if (!interesting.apply(name)) {
|
if (!interesting.apply(name)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
AtomicLong counter = counters.get(name);
|
AtomicLong counter = counters.get(name);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
counters.putIfAbsent(name, new AtomicLong(0));
|
counters.putIfAbsent(name, new AtomicLong(0));
|
||||||
counter = counters.get(name);
|
counter = counters.get(name);
|
||||||
}
|
}
|
||||||
counter.addAndGet(amount);
|
counter.addAndGet(amount);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Increment [%s] %d", name, amount);
|
log.debug("Increment [%s] %d", name, amount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void decrementCounter(String name)
|
public void decrementCounter(String name)
|
||||||
{
|
{
|
||||||
if (!interesting.apply(name)) {
|
if (!interesting.apply(name)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
AtomicLong counter = counters.get(name);
|
AtomicLong counter = counters.get(name);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
counters.putIfAbsent(name, new AtomicLong(0));
|
counters.putIfAbsent(name, new AtomicLong(0));
|
||||||
counter = counters.get(name);
|
counter = counters.get(name);
|
||||||
}
|
}
|
||||||
counter.decrementAndGet();
|
counter.decrementAndGet();
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Decrement [%s]", name);
|
log.debug("Decrement [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void decrementCounter(String name, int amount)
|
public void decrementCounter(String name, int amount)
|
||||||
{
|
{
|
||||||
if (!interesting.apply(name)) {
|
if (!interesting.apply(name)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
AtomicLong counter = counters.get(name);
|
AtomicLong counter = counters.get(name);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
counters.putIfAbsent(name, new AtomicLong(0L));
|
counters.putIfAbsent(name, new AtomicLong(0L));
|
||||||
counter = counters.get(name);
|
counter = counters.get(name);
|
||||||
}
|
}
|
||||||
counter.addAndGet(-amount);
|
counter.addAndGet(-amount);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Decrement [%s] %d", name, amount);
|
log.debug("Decrement [%s] %d", name, amount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addMeter(String name)
|
public void addMeter(String name)
|
||||||
{
|
{
|
||||||
meters.put(name, new AtomicLong(0L));
|
meters.put(name, new AtomicLong(0L));
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Adding meter [%s]", name);
|
log.debug("Adding meter [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeMeter(String name)
|
public void removeMeter(String name)
|
||||||
{
|
{
|
||||||
meters.remove(name);
|
meters.remove(name);
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Removing meter [%s]", name);
|
log.debug("Removing meter [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void markMeter(String name)
|
public void markMeter(String name)
|
||||||
{
|
{
|
||||||
AtomicLong meter = meters.get(name);
|
AtomicLong meter = meters.get(name);
|
||||||
if (meter == null) {
|
if (meter == null) {
|
||||||
meters.putIfAbsent(name, new AtomicLong(0L));
|
meters.putIfAbsent(name, new AtomicLong(0L));
|
||||||
meter = meters.get(name);
|
meter = meters.get(name);
|
||||||
}
|
}
|
||||||
meter.incrementAndGet();
|
meter.incrementAndGet();
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Increment counter [%s]", name);
|
log.debug("Increment counter [%s]", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addHistogram(String name)
|
public void addHistogram(String name)
|
||||||
{
|
{
|
||||||
log.debug("Ignoring add histogram [%s]", name);
|
log.debug("Ignoring add histogram [%s]", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeHistogram(String name)
|
public void removeHistogram(String name)
|
||||||
{
|
{
|
||||||
log.debug("Ignoring remove histogram [%s]", name);
|
log.debug("Ignoring remove histogram [%s]", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateHistogram(String name, int amount)
|
public void updateHistogram(String name, int amount)
|
||||||
{
|
{
|
||||||
log.debug("Ignoring update histogram [%s]: %d", name, amount);
|
log.debug("Ignoring update histogram [%s]: %d", name, amount);
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
)
|
|
||||||
.build(),
|
final ConnectionFactory connectionFactory = new MemcachedCustomConnectionFactoryBuilder()
|
||||||
AddrUtil.getAddresses(config.getHosts())
|
// 1000 repetitions gives us good distribution with murmur3_128
|
||||||
),
|
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
|
||||||
config
|
.setKetamaNodeRepetitions(1000)
|
||||||
);
|
.setHashAlg(MURMUR3_128)
|
||||||
|
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
|
||||||
|
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
|
||||||
|
.setDaemon(true)
|
||||||
|
.setFailureMode(FailureMode.Cancel)
|
||||||
|
.setTranscoder(transcoder)
|
||||||
|
.setShouldOptimize(true)
|
||||||
|
.setOpQueueMaxBlockTime(config.getTimeout())
|
||||||
|
.setOpTimeout(config.getTimeout())
|
||||||
|
.setReadBufferSize(config.getReadBufferSize())
|
||||||
|
.setOpQueueFactory(opQueueFactory)
|
||||||
|
.setMetricCollector(metricCollector)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final List<InetSocketAddress> hosts = AddrUtil.getAddresses(config.getHosts());
|
||||||
|
|
||||||
|
|
||||||
|
final Supplier<ResourceHolder<MemcachedClientIF>> clientSupplier;
|
||||||
|
|
||||||
|
if (config.getNumConnections() > 1) {
|
||||||
|
clientSupplier = new LoadBalancingPool<MemcachedClientIF>(
|
||||||
|
config.getNumConnections(),
|
||||||
|
new Supplier<MemcachedClientIF>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public MemcachedClientIF get()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return new MemcachedClient(connectionFactory, hosts);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
log.error(e, "Unable to create memcached client");
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
clientSupplier = Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||||
|
StupidResourceHolder.<MemcachedClientIF>create(new MemcachedClient(connectionFactory, hosts))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new MemcachedCache(clientSupplier, config);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
@ -342,7 +371,7 @@ public class MemcachedCache implements Cache
|
|||||||
private final int expiration;
|
private final int expiration;
|
||||||
private final String memcachedPrefix;
|
private final String memcachedPrefix;
|
||||||
|
|
||||||
private final MemcachedClientIF client;
|
private final Supplier<ResourceHolder<MemcachedClientIF>> client;
|
||||||
|
|
||||||
private final AtomicLong hitCount = new AtomicLong(0);
|
private final AtomicLong hitCount = new AtomicLong(0);
|
||||||
private final AtomicLong missCount = new AtomicLong(0);
|
private final AtomicLong missCount = new AtomicLong(0);
|
||||||
@ -350,7 +379,7 @@ public class MemcachedCache implements Cache
|
|||||||
private final AtomicLong errorCount = new AtomicLong(0);
|
private final AtomicLong errorCount = new AtomicLong(0);
|
||||||
|
|
||||||
|
|
||||||
MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config)
|
MemcachedCache(Supplier<ResourceHolder<MemcachedClientIF>> client, MemcachedCacheConfig config)
|
||||||
{
|
{
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
|
||||||
@ -381,52 +410,64 @@ public class MemcachedCache implements Cache
|
|||||||
@Override
|
@Override
|
||||||
public byte[] get(NamedKey key)
|
public byte[] get(NamedKey key)
|
||||||
{
|
{
|
||||||
Future<Object> future;
|
try (ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||||
try {
|
Future<Object> future;
|
||||||
future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
try {
|
||||||
}
|
future = clientHolder.get().asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||||
catch (IllegalStateException e) {
|
}
|
||||||
// operation did not get queued in time (queue is full)
|
catch (IllegalStateException e) {
|
||||||
errorCount.incrementAndGet();
|
// operation did not get queued in time (queue is full)
|
||||||
log.warn(e, "Unable to queue cache operation");
|
errorCount.incrementAndGet();
|
||||||
return null;
|
log.warn(e, "Unable to queue cache operation");
|
||||||
}
|
return null;
|
||||||
try {
|
}
|
||||||
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
try {
|
||||||
if (bytes != null) {
|
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
hitCount.incrementAndGet();
|
if (bytes != null) {
|
||||||
} else {
|
hitCount.incrementAndGet();
|
||||||
missCount.incrementAndGet();
|
} else {
|
||||||
|
missCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
return bytes == null ? null : deserializeValue(key, bytes);
|
||||||
|
}
|
||||||
|
catch (TimeoutException e) {
|
||||||
|
timeoutCount.incrementAndGet();
|
||||||
|
future.cancel(false);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
catch (ExecutionException e) {
|
||||||
|
errorCount.incrementAndGet();
|
||||||
|
log.warn(e, "Exception pulling item from cache");
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
return bytes == null ? null : deserializeValue(key, bytes);
|
|
||||||
}
|
}
|
||||||
catch (TimeoutException e) {
|
catch (IOException e) {
|
||||||
timeoutCount.incrementAndGet();
|
|
||||||
future.cancel(false);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
catch (ExecutionException e) {
|
|
||||||
errorCount.incrementAndGet();
|
|
||||||
log.warn(e, "Exception pulling item from cache");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(NamedKey key, byte[] value)
|
public void put(NamedKey key, byte[] value)
|
||||||
{
|
{
|
||||||
try {
|
try (final ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||||
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
clientHolder.get().set(
|
||||||
|
computeKeyHash(memcachedPrefix, key),
|
||||||
|
expiration,
|
||||||
|
serializeValue(key, value)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
catch (IllegalStateException e) {
|
catch (IllegalStateException e) {
|
||||||
// operation did not get queued in time (queue is full)
|
// operation did not get queued in time (queue is full)
|
||||||
errorCount.incrementAndGet();
|
errorCount.incrementAndGet();
|
||||||
log.warn(e, "Unable to queue cache operation");
|
log.warn(e, "Unable to queue cache operation");
|
||||||
}
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
Throwables.propagate(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeValue(NamedKey key, byte[] value)
|
private static byte[] serializeValue(NamedKey key, byte[] value)
|
||||||
@ -459,63 +500,68 @@ public class MemcachedCache implements Cache
|
|||||||
@Override
|
@Override
|
||||||
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||||
{
|
{
|
||||||
Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
|
try (ResourceHolder<MemcachedClientIF> clientHolder = client.get()) {
|
||||||
keys,
|
Map<String, NamedKey> keyLookup = Maps.uniqueIndex(
|
||||||
new Function<NamedKey, String>()
|
keys,
|
||||||
{
|
new Function<NamedKey, String>()
|
||||||
@Override
|
|
||||||
public String apply(
|
|
||||||
@Nullable NamedKey input
|
|
||||||
)
|
|
||||||
{
|
{
|
||||||
return computeKeyHash(memcachedPrefix, input);
|
@Override
|
||||||
|
public String apply(
|
||||||
|
@Nullable NamedKey input
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return computeKeyHash(memcachedPrefix, input);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
||||||
|
|
||||||
|
BulkFuture<Map<String, Object>> future;
|
||||||
|
try {
|
||||||
|
future = clientHolder.get().asyncGetBulk(keyLookup.keySet());
|
||||||
|
}
|
||||||
|
catch (IllegalStateException e) {
|
||||||
|
// operation did not get queued in time (queue is full)
|
||||||
|
errorCount.incrementAndGet();
|
||||||
|
log.warn(e, "Unable to queue cache operation");
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
if (future.isTimeout()) {
|
||||||
|
future.cancel(false);
|
||||||
|
timeoutCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
);
|
missCount.addAndGet(keyLookup.size() - some.size());
|
||||||
|
hitCount.addAndGet(some.size());
|
||||||
|
|
||||||
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
for (Map.Entry<String, Object> entry : some.entrySet()) {
|
||||||
|
final NamedKey key = keyLookup.get(entry.getKey());
|
||||||
|
final byte[] value = (byte[]) entry.getValue();
|
||||||
|
results.put(
|
||||||
|
key,
|
||||||
|
value == null ? null : deserializeValue(key, value)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
BulkFuture<Map<String, Object>> future;
|
return results;
|
||||||
try {
|
|
||||||
future = client.asyncGetBulk(keyLookup.keySet());
|
|
||||||
}
|
|
||||||
catch (IllegalStateException e) {
|
|
||||||
// operation did not get queued in time (queue is full)
|
|
||||||
errorCount.incrementAndGet();
|
|
||||||
log.warn(e, "Unable to queue cache operation");
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
if (future.isTimeout()) {
|
|
||||||
future.cancel(false);
|
|
||||||
timeoutCount.incrementAndGet();
|
|
||||||
}
|
}
|
||||||
missCount.addAndGet(keyLookup.size() - some.size());
|
catch (InterruptedException e) {
|
||||||
hitCount.addAndGet(some.size());
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
for (Map.Entry<String, Object> entry : some.entrySet()) {
|
}
|
||||||
final NamedKey key = keyLookup.get(entry.getKey());
|
catch (ExecutionException e) {
|
||||||
final byte[] value = (byte[]) entry.getValue();
|
errorCount.incrementAndGet();
|
||||||
results.put(
|
log.warn(e, "Exception pulling item from cache");
|
||||||
key,
|
return results;
|
||||||
value == null ? null : deserializeValue(key, value)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
catch (IOException e) {
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
catch (ExecutionException e) {
|
|
||||||
errorCount.incrementAndGet();
|
|
||||||
log.warn(e, "Exception pulling item from cache");
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -51,6 +51,10 @@ public class MemcachedCacheConfig
|
|||||||
@JsonProperty
|
@JsonProperty
|
||||||
private long maxOperationQueueSize = 0;
|
private long maxOperationQueueSize = 0;
|
||||||
|
|
||||||
|
// size of memcached connection pool
|
||||||
|
@JsonProperty
|
||||||
|
private int numConnections = 1;
|
||||||
|
|
||||||
public int getExpiration()
|
public int getExpiration()
|
||||||
{
|
{
|
||||||
return expiration;
|
return expiration;
|
||||||
@ -85,4 +89,9 @@ public class MemcachedCacheConfig
|
|||||||
{
|
{
|
||||||
return readBufferSize;
|
return readBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getNumConnections()
|
||||||
|
{
|
||||||
|
return numConnections;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,10 @@ package io.druid.client.cache;
|
|||||||
import com.google.caliper.Param;
|
import com.google.caliper.Param;
|
||||||
import com.google.caliper.Runner;
|
import com.google.caliper.Runner;
|
||||||
import com.google.caliper.SimpleBenchmark;
|
import com.google.caliper.SimpleBenchmark;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import io.druid.collections.ResourceHolder;
|
||||||
|
import io.druid.collections.StupidResourceHolder;
|
||||||
import net.spy.memcached.AddrUtil;
|
import net.spy.memcached.AddrUtil;
|
||||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||||
import net.spy.memcached.DefaultHashAlgorithm;
|
import net.spy.memcached.DefaultHashAlgorithm;
|
||||||
@ -77,7 +80,9 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark
|
|||||||
|
|
||||||
|
|
||||||
cache = new MemcachedCache(
|
cache = new MemcachedCache(
|
||||||
client,
|
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||||
|
StupidResourceHolder.create(client)
|
||||||
|
),
|
||||||
new MemcachedCacheConfig()
|
new MemcachedCacheConfig()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package io.druid.client.cache;
|
package io.druid.client.cache;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Suppliers;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
@ -34,6 +35,8 @@ import com.metamx.emitter.core.Event;
|
|||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import com.metamx.metrics.Monitor;
|
import com.metamx.metrics.Monitor;
|
||||||
import com.metamx.metrics.MonitorScheduler;
|
import com.metamx.metrics.MonitorScheduler;
|
||||||
|
import io.druid.collections.ResourceHolder;
|
||||||
|
import io.druid.collections.StupidResourceHolder;
|
||||||
import io.druid.guice.GuiceInjectors;
|
import io.druid.guice.GuiceInjectors;
|
||||||
import io.druid.guice.ManageLifecycle;
|
import io.druid.guice.ManageLifecycle;
|
||||||
import io.druid.initialization.Initialization;
|
import io.druid.initialization.Initialization;
|
||||||
@ -110,8 +113,12 @@ public class MemcachedCacheTest
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
{
|
{
|
||||||
MemcachedClientIF client = new MockMemcachedClient();
|
cache = new MemcachedCache(
|
||||||
cache = new MemcachedCache(client, memcachedCacheConfig);
|
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
|
||||||
|
StupidResourceHolder.<MemcachedClientIF>create(new MockMemcachedClient())
|
||||||
|
),
|
||||||
|
memcachedCacheConfig
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user