diff --git a/pom.xml b/pom.xml index 1bfb1e31dc7..5f15bec2533 100644 --- a/pom.xml +++ b/pom.xml @@ -431,7 +431,7 @@ net.spy spymemcached - 2.11.4 + 2.11.7 org.antlr diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 42e7e8023a2..96abb239aff 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -17,16 +17,19 @@ package io.druid.client.cache; +import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; -import net.spy.memcached.DefaultHashAlgorithm; import net.spy.memcached.FailureMode; +import net.spy.memcached.HashAlgorithm; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; @@ -49,6 +52,23 @@ public class MemcachedCache implements Cache { private static final Logger log = new Logger(MemcachedCache.class); + final static HashAlgorithm MURMUR3_128 = new HashAlgorithm() + { + final HashFunction fn = Hashing.murmur3_128(); + + @Override + public long hash(String k) + { + return fn.hashString(k, Charsets.UTF_8).asLong(); + } + + @Override + public String toString() + { + return fn.toString(); + } + }; + public static MemcachedCache create(final MemcachedCacheConfig config) { try { @@ -67,18 +87,22 @@ public class MemcachedCache implements Cache return new MemcachedCache( new MemcachedClient( - new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) - .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) - .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) - .setDaemon(true) - .setFailureMode(FailureMode.Cancel) - .setTranscoder(transcoder) - .setShouldOptimize(true) - .setOpQueueMaxBlockTime(config.getTimeout()) - .setOpTimeout(config.getTimeout()) - .setReadBufferSize(config.getReadBufferSize()) - .setOpQueueFactory(opQueueFactory) - .build(), + new MemcachedCustomConnectionFactoryBuilder() + // 1000 repetitions gives us good distribution with murmur3_128 + // (approx < 5% difference in counts across nodes, with 5 cache nodes) + .setKetamaNodeRepetitions(1000) + .setHashAlg(MURMUR3_128) + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT) + .setDaemon(true) + .setFailureMode(FailureMode.Cancel) + .setTranscoder(transcoder) + .setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()) + .setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()) + .setOpQueueFactory(opQueueFactory) + .build(), AddrUtil.getAddresses(config.getHosts()) ), config diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java b/server/src/main/java/io/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java new file mode 100644 index 00000000000..e62929532f9 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/MemcachedCustomConnectionFactoryBuilder.java @@ -0,0 +1,197 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import net.spy.memcached.ArrayModNodeLocator; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.ConnectionObserver; +import net.spy.memcached.DefaultConnectionFactory; +import net.spy.memcached.FailureMode; +import net.spy.memcached.HashAlgorithm; +import net.spy.memcached.KetamaNodeLocator; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.NodeLocator; +import net.spy.memcached.OperationFactory; +import net.spy.memcached.auth.AuthDescriptor; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.transcoders.Transcoder; +import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +class MemcachedCustomConnectionFactoryBuilder extends ConnectionFactoryBuilder +{ + private int repetitions = new DefaultKetamaNodeLocatorConfiguration().getNodeRepetitions(); + + public MemcachedCustomConnectionFactoryBuilder setKetamaNodeRepetitions(int repetitions) + { + this.repetitions = repetitions; + return this; + } + + // borrowed from ConnectionFactoryBuilder to allow setting number of repetitions for KetamaNodeLocator + @Override + public ConnectionFactory build() + { + return new DefaultConnectionFactory() { + @Override + public NodeLocator createLocator(List nodes) { + switch (locator) { + case ARRAY_MOD: + return new ArrayModNodeLocator(nodes, getHashAlg()); + case CONSISTENT: + return new KetamaNodeLocator( + nodes, + getHashAlg(), + new DefaultKetamaNodeLocatorConfiguration() + { + @Override + public int getNodeRepetitions() + { + return repetitions; + } + } + ); + default: + throw new IllegalStateException("Unhandled locator type: " + locator); + } + } + + @Override + public BlockingQueue createOperationQueue() { + return opQueueFactory == null ? super.createOperationQueue() + : opQueueFactory.create(); + } + + @Override + public BlockingQueue createReadOperationQueue() { + return readQueueFactory == null ? super.createReadOperationQueue() + : readQueueFactory.create(); + } + + @Override + public BlockingQueue createWriteOperationQueue() { + return writeQueueFactory == null ? super.createReadOperationQueue() + : writeQueueFactory.create(); + } + + @Override + public Transcoder getDefaultTranscoder() { + return transcoder == null ? super.getDefaultTranscoder() : transcoder; + } + + @Override + public FailureMode getFailureMode() { + return failureMode == null ? super.getFailureMode() : failureMode; + } + + @Override + public HashAlgorithm getHashAlg() { + return hashAlg == null ? super.getHashAlg() : hashAlg; + } + + public Collection getInitialObservers() { + return initialObservers; + } + + @Override + public OperationFactory getOperationFactory() { + return opFact == null ? super.getOperationFactory() : opFact; + } + + @Override + public long getOperationTimeout() { + return opTimeout == -1 ? super.getOperationTimeout() : opTimeout; + } + + @Override + public int getReadBufSize() { + return readBufSize == -1 ? super.getReadBufSize() : readBufSize; + } + + @Override + public boolean isDaemon() { + return isDaemon; + } + + @Override + public boolean shouldOptimize() { + return shouldOptimize; + } + + @Override + public boolean useNagleAlgorithm() { + return useNagle; + } + + @Override + public long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + @Override + public AuthDescriptor getAuthDescriptor() { + return authDescriptor; + } + + @Override + public long getOpQueueMaxBlockTime() { + return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime + : super.getOpQueueMaxBlockTime(); + } + + @Override + public int getTimeoutExceptionThreshold() { + return timeoutExceptionThreshold; + } + + @Override + public MetricType enableMetrics() { + return metricType == null ? super.enableMetrics() : metricType; + } + + @Override + public MetricCollector getMetricCollector() { + return collector == null ? super.getMetricCollector() : collector; + } + + @Override + public ExecutorService getListenerExecutorService() { + return executorService == null ? super.getListenerExecutorService() : executorService; + } + + @Override + public boolean isDefaultExecutorService() { + return executorService == null; + } + + @Override + public long getAuthWaitTime() { + return authWaitTime; + } + }; + } +} diff --git a/server/src/test/java/io/druid/client/cache/CacheDistributionTest.java b/server/src/test/java/io/druid/client/cache/CacheDistributionTest.java new file mode 100644 index 00000000000..56ae1668b0a --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/CacheDistributionTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.cache; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import net.spy.memcached.DefaultHashAlgorithm; +import net.spy.memcached.HashAlgorithm; +import net.spy.memcached.KetamaNodeLocator; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration; +import org.apache.commons.codec.digest.DigestUtils; +import org.easymock.EasyMock; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +@RunWith(Parameterized.class) +public class CacheDistributionTest +{ + public static final int KEY_COUNT = 1_000_000; + + @Parameterized.Parameters(name = "repetitions={0}, hash={1}") + public static Iterable data() { + List hash = ImmutableList.of( + DefaultHashAlgorithm.FNV1A_64_HASH, DefaultHashAlgorithm.KETAMA_HASH, MemcachedCache.MURMUR3_128 + ); + List repetitions = Arrays.asList(160, 500, 1000, 2500, 5000); + + Set> values = Sets.cartesianProduct( + Sets.newLinkedHashSet(hash), + Sets.newLinkedHashSet(repetitions) + ); + return Iterables.transform( + values, new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } + } + ); + } + + final HashAlgorithm hash; + final int reps; + + @BeforeClass + public static void header() { + System.out.printf( + "%25s\t%5s\t%10s\t%10s\t%10s\t%10s\t%10s\t%7s\t%5s\n", + "hash", "reps", "node 1", "node 2", "node 3", "node 4", "node 5", "min/max", "ns" + ); + } + + public CacheDistributionTest(final HashAlgorithm hash, final int reps) + { + this.hash = hash; + this.reps = reps; + } + + // run to get a sense of cache key distribution for different ketama reps / hash functions + @Test + public void testDistribution() throws Exception + { + KetamaNodeLocator locator = new KetamaNodeLocator( + ImmutableList.of( + dummyNode("druid-cache.0001", 11211), + dummyNode("druid-cache.0002", 11211), + dummyNode("druid-cache.0003", 11211), + dummyNode("druid-cache.0004", 11211), + dummyNode("druid-cache.0005", 11211) + ), + hash, + new DefaultKetamaNodeLocatorConfiguration() + { + @Override + public int getNodeRepetitions() + { + return reps; + } + } + ); + + Map counter = Maps.newHashMap(); + long t = 0; + for(int i = 0; i < KEY_COUNT; ++i) { + final String k = DigestUtils.sha1Hex("abc" + i) + ":" + DigestUtils.sha1Hex("xyz" + i); + long t0 = System.nanoTime(); + MemcachedNode node = locator.getPrimary(k); + t += System.nanoTime() - t0; + if(counter.containsKey(node)) { + counter.get(node).incrementAndGet(); + } else { + counter.put(node, new AtomicLong(1)); + } + } + + long min = Long.MAX_VALUE; + long max = 0; + System.out.printf("%25s\t%5d\t", hash, reps); + for(AtomicLong count : counter.values()) { + System.out.printf("%10d\t", count.get()); + min = Math.min(min, count.get()); + max = Math.max(max, count.get()); + } + System.out.printf("%7.2f\t%5.0f\n", (double) min / (double) max, (double)t / KEY_COUNT); + } + + private static MemcachedNode dummyNode(String host, int port) { + SocketAddress address = new InetSocketAddress(host, port); + MemcachedNode node = EasyMock.createNiceMock(MemcachedNode.class); + EasyMock.expect(node.getSocketAddress()).andReturn(address).anyTimes(); + EasyMock.replay(node); + return node; + } +}