From 1f897257b5a8c081215aa0c352e46272006e2f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 16 Sep 2015 14:35:02 -0700 Subject: [PATCH 1/2] add simple load balancing pool --- .../druid/collections/LoadBalancingPool.java | 146 ++++++++++++++++++ .../io/druid/collections/ResourceHolder.java | 2 +- 2 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/io/druid/collections/LoadBalancingPool.java diff --git a/common/src/main/java/io/druid/collections/LoadBalancingPool.java b/common/src/main/java/io/druid/collections/LoadBalancingPool.java new file mode 100644 index 00000000000..4fbc558eebd --- /dev/null +++ b/common/src/main/java/io/druid/collections/LoadBalancingPool.java @@ -0,0 +1,146 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.metamx.common.logger.Logger; + +import java.io.IOException; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Simple load balancing pool that always returns the least used item. + * + * An item's usage is incremented every time one gets requested from the pool + * and is decremented every time close is called on the holder. + * + * The pool eagerly instantiates all the items in the pool when created, + * using the given supplier. + * + * @param type of items to pool + */ +public class LoadBalancingPool implements Supplier> +{ + private static final Logger log = new Logger(LoadBalancingPool.class); + + private final Supplier generator; + private final int capacity; + private final PriorityBlockingQueue queue; + + public LoadBalancingPool(int capacity, Supplier generator) + { + Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0"); + Preconditions.checkNotNull(generator); + + this.generator = generator; + this.capacity = capacity; + this.queue = new PriorityBlockingQueue<>(capacity); + + // eagerly intantiate all items in the pool + init(); + } + + private void init() { + for(int i = 0; i < capacity; ++i) { + queue.offer(new CountingHolder(generator.get())); + } + } + + public ResourceHolder get() + { + final CountingHolder holder; + // items never stay out of the queue for long, so we'll get one eventually + try { + holder = queue.take(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + + // synchronize on item to ensure count cannot get changed by + // CountingHolder.close right after we put the item back in the queue + synchronized (holder) { + holder.count.incrementAndGet(); + queue.offer(holder); + } + return holder; + } + + private class CountingHolder implements ResourceHolder, Comparable + { + private AtomicInteger count = new AtomicInteger(0); + private final T object; + + public CountingHolder(final T object) + { + this.object = object; + } + + @Override + public T get() + { + return object; + } + + /** + * Not idempotent, should only be called once when done using the resource + * + * @throws IOException + */ + @Override + public void close() throws IOException + { + // ensures count always gets adjusted while item is removed from the queue + synchronized (this) { + // item may not be in queue if another thread is calling LoadBalancingPool.get() + // at the same time; in that case let the other thread put it back. + boolean removed = queue.remove(this); + count.decrementAndGet(); + if (removed) { + queue.offer(this); + } + } + } + + @Override + public int compareTo(CountingHolder o) + { + return Integer.compare(count.get(), o.count.get()); + } + + + @Override + protected void finalize() throws Throwable + { + try { + final int shouldBeZero = count.get(); + if (shouldBeZero != 0) { + log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, object); + } + } + finally { + super.finalize(); + } + } + } +} diff --git a/common/src/main/java/io/druid/collections/ResourceHolder.java b/common/src/main/java/io/druid/collections/ResourceHolder.java index b8589212450..6fa3be80c2d 100644 --- a/common/src/main/java/io/druid/collections/ResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ResourceHolder.java @@ -23,5 +23,5 @@ import java.io.Closeable; */ public interface ResourceHolder extends Closeable { - public T get(); + T get(); } From df4c8a3aa5da490a427adca80e6eec7f7b59cc59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 16 Sep 2015 10:49:19 -0700 Subject: [PATCH 2/2] balance memcached cache across multiple connections --- docs/content/configuration/broker.md | 1 + .../io/druid/client/cache/MemcachedCache.java | 528 ++++++++++-------- .../client/cache/MemcachedCacheConfig.java | 9 + .../client/cache/MemcachedCacheBenchmark.java | 7 +- .../client/cache/MemcachedCacheTest.java | 11 +- 5 files changed, 312 insertions(+), 244 deletions(-) diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 4a548a0952e..1ade1eb06df 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -109,3 +109,4 @@ You can optionally only configure caching to be enabled on the broker by setting |`druid.cache.hosts`|Command separated list of Memcached hosts ``.|none| |`druid.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)| |`druid.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid| +|`druid.cache.numConnections`|Number of memcached connections to use.|1| diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 363e6ad4d37..d290f7f2405 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -21,6 +21,8 @@ import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -32,7 +34,11 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.MonitorScheduler; +import io.druid.collections.LoadBalancingPool; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; import net.spy.memcached.AddrUtil; +import net.spy.memcached.ConnectionFactory; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.FailureMode; import net.spy.memcached.HashAlgorithm; @@ -40,16 +46,17 @@ import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.metrics.MetricCollector; -import net.spy.memcached.metrics.MetricType; import net.spy.memcached.ops.LinkedOperationQueueFactory; import net.spy.memcached.ops.OperationQueueFactory; import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -154,184 +161,206 @@ public class MemcachedCache implements Cache } }; - return new MemcachedCache( - new MemcachedClient( - new MemcachedCustomConnectionFactoryBuilder() - // 1000 repetitions gives us good distribution with murmur3_128 - // (approx < 5% difference in counts across nodes, with 5 cache nodes) - .setKetamaNodeRepetitions(1000) - .setHashAlg(MURMUR3_128) - .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) - .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) - .setDaemon(true) - .setFailureMode(FailureMode.Cancel) - .setTranscoder(transcoder) - .setShouldOptimize(true) - .setOpQueueMaxBlockTime(config.getTimeout()) - .setOpTimeout(config.getTimeout()) - .setReadBufferSize(config.getReadBufferSize()) - .setOpQueueFactory(opQueueFactory) - .setEnableMetrics(MetricType.DEBUG) // Not as scary as it sounds - .setWriteOpQueueFactory(opQueueFactory) - .setReadOpQueueFactory(opQueueFactory) - .setMetricCollector( - new MetricCollector() - { - @Override - public void addCounter(String name) - { - if (!interesting.apply(name)) { - return; - } - counters.put(name, new AtomicLong(0L)); + final MetricCollector metricCollector = new MetricCollector() + { + @Override + public void addCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + counters.put(name, new AtomicLong(0L)); - if (log.isDebugEnabled()) { - log.debug("Add Counter [%s]", name); - } - } + if (log.isDebugEnabled()) { + log.debug("Add Counter [%s]", name); + } + } - @Override - public void removeCounter(String name) - { - if (!interesting.apply(name)) { - return; - } - counters.remove(name); + @Override + public void removeCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + counters.remove(name); - if (log.isDebugEnabled()) { - log.debug("Remove Counter [%s]", name); - } - } + if (log.isDebugEnabled()) { + log.debug("Remove Counter [%s]", name); + } + } - @Override - public void incrementCounter(String name) - { - if (!interesting.apply(name)) { - return; - } - AtomicLong counter = counters.get(name); - if (counter == null) { - counters.putIfAbsent(name, new AtomicLong(0)); - counter = counters.get(name); - } - counter.incrementAndGet(); + @Override + public void incrementCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.incrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("Incrament [%s]", name); - } - } + if (log.isDebugEnabled()) { + log.debug("Increment [%s]", name); + } + } - @Override - public void incrementCounter(String name, int amount) - { - if (!interesting.apply(name)) { - return; - } - AtomicLong counter = counters.get(name); - if (counter == null) { - counters.putIfAbsent(name, new AtomicLong(0)); - counter = counters.get(name); - } - counter.addAndGet(amount); + @Override + public void incrementCounter(String name, int amount) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.addAndGet(amount); - if (log.isDebugEnabled()) { - log.debug("Increment [%s] %d", name, amount); - } - } + if (log.isDebugEnabled()) { + log.debug("Increment [%s] %d", name, amount); + } + } - @Override - public void decrementCounter(String name) - { - if (!interesting.apply(name)) { - return; - } - AtomicLong counter = counters.get(name); - if (counter == null) { - counters.putIfAbsent(name, new AtomicLong(0)); - counter = counters.get(name); - } - counter.decrementAndGet(); + @Override + public void decrementCounter(String name) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0)); + counter = counters.get(name); + } + counter.decrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("Decrement [%s]", name); - } - } + if (log.isDebugEnabled()) { + log.debug("Decrement [%s]", name); + } + } - @Override - public void decrementCounter(String name, int amount) - { - if (!interesting.apply(name)) { - return; - } - AtomicLong counter = counters.get(name); - if (counter == null) { - counters.putIfAbsent(name, new AtomicLong(0L)); - counter = counters.get(name); - } - counter.addAndGet(-amount); + @Override + public void decrementCounter(String name, int amount) + { + if (!interesting.apply(name)) { + return; + } + AtomicLong counter = counters.get(name); + if (counter == null) { + counters.putIfAbsent(name, new AtomicLong(0L)); + counter = counters.get(name); + } + counter.addAndGet(-amount); - if (log.isDebugEnabled()) { - log.debug("Decrement [%s] %d", name, amount); - } - } + if (log.isDebugEnabled()) { + log.debug("Decrement [%s] %d", name, amount); + } + } - @Override - public void addMeter(String name) - { - meters.put(name, new AtomicLong(0L)); - if (log.isDebugEnabled()) { - log.debug("Adding meter [%s]", name); - } - } + @Override + public void addMeter(String name) + { + meters.put(name, new AtomicLong(0L)); + if (log.isDebugEnabled()) { + log.debug("Adding meter [%s]", name); + } + } - @Override - public void removeMeter(String name) - { - meters.remove(name); - if (log.isDebugEnabled()) { - log.debug("Removing meter [%s]", name); - } - } + @Override + public void removeMeter(String name) + { + meters.remove(name); + if (log.isDebugEnabled()) { + log.debug("Removing meter [%s]", name); + } + } - @Override - public void markMeter(String name) - { - AtomicLong meter = meters.get(name); - if (meter == null) { - meters.putIfAbsent(name, new AtomicLong(0L)); - meter = meters.get(name); - } - meter.incrementAndGet(); + @Override + public void markMeter(String name) + { + AtomicLong meter = meters.get(name); + if (meter == null) { + meters.putIfAbsent(name, new AtomicLong(0L)); + meter = meters.get(name); + } + meter.incrementAndGet(); - if (log.isDebugEnabled()) { - log.debug("Increment counter [%s]", name); - } - } + if (log.isDebugEnabled()) { + log.debug("Increment counter [%s]", name); + } + } - @Override - public void addHistogram(String name) - { - log.debug("Ignoring add histogram [%s]", name); - } + @Override + public void addHistogram(String name) + { + log.debug("Ignoring add histogram [%s]", name); + } - @Override - public void removeHistogram(String name) - { - log.debug("Ignoring remove histogram [%s]", name); - } + @Override + public void removeHistogram(String name) + { + log.debug("Ignoring remove histogram [%s]", name); + } - @Override - public void updateHistogram(String name, int amount) - { - log.debug("Ignoring update histogram [%s]: %d", name, amount); - } - } - ) - .build(), - AddrUtil.getAddresses(config.getHosts()) - ), - config - ); + @Override + public void updateHistogram(String name, int amount) + { + log.debug("Ignoring update histogram [%s]: %d", name, amount); + } + }; + + final ConnectionFactory connectionFactory = new MemcachedCustomConnectionFactoryBuilder() + // 1000 repetitions gives us good distribution with murmur3_128 + // (approx < 5% difference in counts across nodes, with 5 cache nodes) + .setKetamaNodeRepetitions(1000) + .setHashAlg(MURMUR3_128) + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) + .setDaemon(true) + .setFailureMode(FailureMode.Cancel) + .setTranscoder(transcoder) + .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()) + .setOpQueueFactory(opQueueFactory) + .setMetricCollector(metricCollector) + .build(); + + final List hosts = AddrUtil.getAddresses(config.getHosts()); + + + final Supplier> clientSupplier; + + if (config.getNumConnections() > 1) { + clientSupplier = new LoadBalancingPool( + config.getNumConnections(), + new Supplier() + { + @Override + public MemcachedClientIF get() + { + try { + return new MemcachedClient(connectionFactory, hosts); + } + catch (IOException e) { + log.error(e, "Unable to create memcached client"); + throw Throwables.propagate(e); + } + } + } + ); + } else { + clientSupplier = Suppliers.>ofInstance( + StupidResourceHolder.create(new MemcachedClient(connectionFactory, hosts)) + ); + } + + return new MemcachedCache(clientSupplier, config); } catch (IOException e) { throw Throwables.propagate(e); @@ -342,7 +371,7 @@ public class MemcachedCache implements Cache private final int expiration; private final String memcachedPrefix; - private final MemcachedClientIF client; + private final Supplier> client; private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); @@ -350,7 +379,7 @@ public class MemcachedCache implements Cache private final AtomicLong errorCount = new AtomicLong(0); - MemcachedCache(MemcachedClientIF client, MemcachedCacheConfig config) + MemcachedCache(Supplier> client, MemcachedCacheConfig config) { Preconditions.checkArgument( config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH, @@ -381,52 +410,64 @@ public class MemcachedCache implements Cache @Override public byte[] get(NamedKey key) { - Future future; - try { - future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); - } - catch (IllegalStateException e) { - // operation did not get queued in time (queue is full) - errorCount.incrementAndGet(); - log.warn(e, "Unable to queue cache operation"); - return null; - } - try { - byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); - if (bytes != null) { - hitCount.incrementAndGet(); - } else { - missCount.incrementAndGet(); + try (ResourceHolder clientHolder = client.get()) { + Future future; + try { + future = clientHolder.get().asyncGet(computeKeyHash(memcachedPrefix, key)); + } + catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + log.warn(e, "Unable to queue cache operation"); + return null; + } + try { + byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); + if (bytes != null) { + hitCount.incrementAndGet(); + } else { + missCount.incrementAndGet(); + } + return bytes == null ? null : deserializeValue(key, bytes); + } + catch (TimeoutException e) { + timeoutCount.incrementAndGet(); + future.cancel(false); + return null; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (ExecutionException e) { + errorCount.incrementAndGet(); + log.warn(e, "Exception pulling item from cache"); + return null; } - return bytes == null ? null : deserializeValue(key, bytes); } - catch (TimeoutException e) { - timeoutCount.incrementAndGet(); - future.cancel(false); - return null; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + catch (IOException e) { throw Throwables.propagate(e); } - catch (ExecutionException e) { - errorCount.incrementAndGet(); - log.warn(e, "Exception pulling item from cache"); - return null; - } } @Override public void put(NamedKey key, byte[] value) { - try { - client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + try (final ResourceHolder clientHolder = client.get()) { + clientHolder.get().set( + computeKeyHash(memcachedPrefix, key), + expiration, + serializeValue(key, value) + ); } catch (IllegalStateException e) { // operation did not get queued in time (queue is full) errorCount.incrementAndGet(); log.warn(e, "Unable to queue cache operation"); } + catch (IOException e) { + Throwables.propagate(e); + } } private static byte[] serializeValue(NamedKey key, byte[] value) @@ -459,63 +500,68 @@ public class MemcachedCache implements Cache @Override public Map getBulk(Iterable keys) { - Map keyLookup = Maps.uniqueIndex( - keys, - new Function() - { - @Override - public String apply( - @Nullable NamedKey input - ) + try (ResourceHolder clientHolder = client.get()) { + Map keyLookup = Maps.uniqueIndex( + keys, + new Function() { - return computeKeyHash(memcachedPrefix, input); + @Override + public String apply( + @Nullable NamedKey input + ) + { + return computeKeyHash(memcachedPrefix, input); + } } + ); + + Map results = Maps.newHashMap(); + + BulkFuture> future; + try { + future = clientHolder.get().asyncGetBulk(keyLookup.keySet()); + } + catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + log.warn(e, "Unable to queue cache operation"); + return results; + } + + try { + Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); + + if (future.isTimeout()) { + future.cancel(false); + timeoutCount.incrementAndGet(); } - ); + missCount.addAndGet(keyLookup.size() - some.size()); + hitCount.addAndGet(some.size()); - Map results = Maps.newHashMap(); + for (Map.Entry entry : some.entrySet()) { + final NamedKey key = keyLookup.get(entry.getKey()); + final byte[] value = (byte[]) entry.getValue(); + results.put( + key, + value == null ? null : deserializeValue(key, value) + ); + } - BulkFuture> future; - try { - future = client.asyncGetBulk(keyLookup.keySet()); - } - catch (IllegalStateException e) { - // operation did not get queued in time (queue is full) - errorCount.incrementAndGet(); - log.warn(e, "Unable to queue cache operation"); - return results; - } - - try { - Map some = future.getSome(timeout, TimeUnit.MILLISECONDS); - - if (future.isTimeout()) { - future.cancel(false); - timeoutCount.incrementAndGet(); + return results; } - missCount.addAndGet(keyLookup.size() - some.size()); - hitCount.addAndGet(some.size()); - - for (Map.Entry entry : some.entrySet()) { - final NamedKey key = keyLookup.get(entry.getKey()); - final byte[] value = (byte[]) entry.getValue(); - results.put( - key, - value == null ? null : deserializeValue(key, value) - ); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (ExecutionException e) { + errorCount.incrementAndGet(); + log.warn(e, "Exception pulling item from cache"); + return results; } - - return results; } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + catch (IOException e) { throw Throwables.propagate(e); } - catch (ExecutionException e) { - errorCount.incrementAndGet(); - log.warn(e, "Exception pulling item from cache"); - return results; - } } @Override diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 8e8cc708179..cd712076e2f 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -51,6 +51,10 @@ public class MemcachedCacheConfig @JsonProperty private long maxOperationQueueSize = 0; + // size of memcached connection pool + @JsonProperty + private int numConnections = 1; + public int getExpiration() { return expiration; @@ -85,4 +89,9 @@ public class MemcachedCacheConfig { return readBufferSize; } + + public int getNumConnections() + { + return numConnections; + } } diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java index cbd850eb2b8..db405ff7f30 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheBenchmark.java @@ -20,7 +20,10 @@ package io.druid.client.cache; import com.google.caliper.Param; import com.google.caliper.Runner; import com.google.caliper.SimpleBenchmark; +import com.google.common.base.Suppliers; import com.google.common.collect.Lists; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -77,7 +80,9 @@ public class MemcachedCacheBenchmark extends SimpleBenchmark cache = new MemcachedCache( - client, + Suppliers.>ofInstance( + StupidResourceHolder.create(client) + ), new MemcachedCacheConfig() { @Override diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java index 44b8916bc43..77631141aaa 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java @@ -18,6 +18,7 @@ package io.druid.client.cache; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -34,6 +35,8 @@ import com.metamx.emitter.core.Event; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; import io.druid.guice.GuiceInjectors; import io.druid.guice.ManageLifecycle; import io.druid.initialization.Initialization; @@ -110,8 +113,12 @@ public class MemcachedCacheTest @Before public void setUp() throws Exception { - MemcachedClientIF client = new MockMemcachedClient(); - cache = new MemcachedCache(client, memcachedCacheConfig); + cache = new MemcachedCache( + Suppliers.>ofInstance( + StupidResourceHolder.create(new MockMemcachedClient()) + ), + memcachedCacheConfig + ); } @Test