Merge pull request #1450 from metamx/cache-key-distribution

Fix bad distribution of cache keys across cache nodes
This commit is contained in:
Fangjin Yang 2015-06-22 17:58:15 -07:00
commit 0c13fd8018
4 changed files with 384 additions and 14 deletions

View File

@ -431,7 +431,7 @@
<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>2.11.4</version>
<version>2.11.7</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>

View File

@ -17,16 +17,19 @@
package io.druid.client.cache;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture;
@ -49,6 +52,23 @@ public class MemcachedCache implements Cache
{
private static final Logger log = new Logger(MemcachedCache.class);
final static HashAlgorithm MURMUR3_128 = new HashAlgorithm()
{
final HashFunction fn = Hashing.murmur3_128();
@Override
public long hash(String k)
{
return fn.hashString(k, Charsets.UTF_8).asLong();
}
@Override
public String toString()
{
return fn.toString();
}
};
public static MemcachedCache create(final MemcachedCacheConfig config)
{
try {
@ -67,18 +87,22 @@ public class MemcachedCache implements Cache
return new MemcachedCache(
new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true)
.setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder)
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory)
.build(),
new MemcachedCustomConnectionFactoryBuilder()
// 1000 repetitions gives us good distribution with murmur3_128
// (approx < 5% difference in counts across nodes, with 5 cache nodes)
.setKetamaNodeRepetitions(1000)
.setHashAlg(MURMUR3_128)
.setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true)
.setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder)
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.setReadBufferSize(config.getReadBufferSize())
.setOpQueueFactory(opQueueFactory)
.build(),
AddrUtil.getAddresses(config.getHosts())
),
config

View File

@ -0,0 +1,197 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import net.spy.memcached.ArrayModNodeLocator;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.FailureMode;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.KetamaNodeLocator;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
class MemcachedCustomConnectionFactoryBuilder extends ConnectionFactoryBuilder
{
private int repetitions = new DefaultKetamaNodeLocatorConfiguration().getNodeRepetitions();
public MemcachedCustomConnectionFactoryBuilder setKetamaNodeRepetitions(int repetitions)
{
this.repetitions = repetitions;
return this;
}
// borrowed from ConnectionFactoryBuilder to allow setting number of repetitions for KetamaNodeLocator
@Override
public ConnectionFactory build()
{
return new DefaultConnectionFactory() {
@Override
public NodeLocator createLocator(List<MemcachedNode> nodes) {
switch (locator) {
case ARRAY_MOD:
return new ArrayModNodeLocator(nodes, getHashAlg());
case CONSISTENT:
return new KetamaNodeLocator(
nodes,
getHashAlg(),
new DefaultKetamaNodeLocatorConfiguration()
{
@Override
public int getNodeRepetitions()
{
return repetitions;
}
}
);
default:
throw new IllegalStateException("Unhandled locator type: " + locator);
}
}
@Override
public BlockingQueue<Operation> createOperationQueue() {
return opQueueFactory == null ? super.createOperationQueue()
: opQueueFactory.create();
}
@Override
public BlockingQueue<Operation> createReadOperationQueue() {
return readQueueFactory == null ? super.createReadOperationQueue()
: readQueueFactory.create();
}
@Override
public BlockingQueue<Operation> createWriteOperationQueue() {
return writeQueueFactory == null ? super.createReadOperationQueue()
: writeQueueFactory.create();
}
@Override
public Transcoder<Object> getDefaultTranscoder() {
return transcoder == null ? super.getDefaultTranscoder() : transcoder;
}
@Override
public FailureMode getFailureMode() {
return failureMode == null ? super.getFailureMode() : failureMode;
}
@Override
public HashAlgorithm getHashAlg() {
return hashAlg == null ? super.getHashAlg() : hashAlg;
}
public Collection<ConnectionObserver> getInitialObservers() {
return initialObservers;
}
@Override
public OperationFactory getOperationFactory() {
return opFact == null ? super.getOperationFactory() : opFact;
}
@Override
public long getOperationTimeout() {
return opTimeout == -1 ? super.getOperationTimeout() : opTimeout;
}
@Override
public int getReadBufSize() {
return readBufSize == -1 ? super.getReadBufSize() : readBufSize;
}
@Override
public boolean isDaemon() {
return isDaemon;
}
@Override
public boolean shouldOptimize() {
return shouldOptimize;
}
@Override
public boolean useNagleAlgorithm() {
return useNagle;
}
@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
@Override
public AuthDescriptor getAuthDescriptor() {
return authDescriptor;
}
@Override
public long getOpQueueMaxBlockTime() {
return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime
: super.getOpQueueMaxBlockTime();
}
@Override
public int getTimeoutExceptionThreshold() {
return timeoutExceptionThreshold;
}
@Override
public MetricType enableMetrics() {
return metricType == null ? super.enableMetrics() : metricType;
}
@Override
public MetricCollector getMetricCollector() {
return collector == null ? super.getMetricCollector() : collector;
}
@Override
public ExecutorService getListenerExecutorService() {
return executorService == null ? super.getListenerExecutorService() : executorService;
}
@Override
public boolean isDefaultExecutorService() {
return executorService == null;
}
@Override
public long getAuthWaitTime() {
return authWaitTime;
}
};
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.cache;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import net.spy.memcached.DefaultHashAlgorithm;
import net.spy.memcached.HashAlgorithm;
import net.spy.memcached.KetamaNodeLocator;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
import org.apache.commons.codec.digest.DigestUtils;
import org.easymock.EasyMock;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(Parameterized.class)
public class CacheDistributionTest
{
public static final int KEY_COUNT = 1_000_000;
@Parameterized.Parameters(name = "repetitions={0}, hash={1}")
public static Iterable<Object[]> data() {
List<HashAlgorithm> hash = ImmutableList.of(
DefaultHashAlgorithm.FNV1A_64_HASH, DefaultHashAlgorithm.KETAMA_HASH, MemcachedCache.MURMUR3_128
);
List<Integer> repetitions = Arrays.asList(160, 500, 1000, 2500, 5000);
Set<List<Object>> values = Sets.cartesianProduct(
Sets.newLinkedHashSet(hash),
Sets.newLinkedHashSet(repetitions)
);
return Iterables.transform(
values, new Function<List<Object>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<Object> input)
{
return input.toArray();
}
}
);
}
final HashAlgorithm hash;
final int reps;
@BeforeClass
public static void header() {
System.out.printf(
"%25s\t%5s\t%10s\t%10s\t%10s\t%10s\t%10s\t%7s\t%5s\n",
"hash", "reps", "node 1", "node 2", "node 3", "node 4", "node 5", "min/max", "ns"
);
}
public CacheDistributionTest(final HashAlgorithm hash, final int reps)
{
this.hash = hash;
this.reps = reps;
}
// run to get a sense of cache key distribution for different ketama reps / hash functions
@Test
public void testDistribution() throws Exception
{
KetamaNodeLocator locator = new KetamaNodeLocator(
ImmutableList.of(
dummyNode("druid-cache.0001", 11211),
dummyNode("druid-cache.0002", 11211),
dummyNode("druid-cache.0003", 11211),
dummyNode("druid-cache.0004", 11211),
dummyNode("druid-cache.0005", 11211)
),
hash,
new DefaultKetamaNodeLocatorConfiguration()
{
@Override
public int getNodeRepetitions()
{
return reps;
}
}
);
Map<MemcachedNode, AtomicLong> counter = Maps.newHashMap();
long t = 0;
for(int i = 0; i < KEY_COUNT; ++i) {
final String k = DigestUtils.sha1Hex("abc" + i) + ":" + DigestUtils.sha1Hex("xyz" + i);
long t0 = System.nanoTime();
MemcachedNode node = locator.getPrimary(k);
t += System.nanoTime() - t0;
if(counter.containsKey(node)) {
counter.get(node).incrementAndGet();
} else {
counter.put(node, new AtomicLong(1));
}
}
long min = Long.MAX_VALUE;
long max = 0;
System.out.printf("%25s\t%5d\t", hash, reps);
for(AtomicLong count : counter.values()) {
System.out.printf("%10d\t", count.get());
min = Math.min(min, count.get());
max = Math.max(max, count.get());
}
System.out.printf("%7.2f\t%5.0f\n", (double) min / (double) max, (double)t / KEY_COUNT);
}
private static MemcachedNode dummyNode(String host, int port) {
SocketAddress address = new InetSocketAddress(host, port);
MemcachedNode node = EasyMock.createNiceMock(MemcachedNode.class);
EasyMock.expect(node.getSocketAddress()).andReturn(address).anyTimes();
EasyMock.replay(node);
return node;
}
}