Add Router connection balancers for Avatica queries (#4983)

* Add Router connection balancers for Avatica queries

* PR comments

* Adjust test bounds

* PR comments

* Add doc comments

* PR comments

* PR comment

* Checkstyle fix
This commit is contained in:
Jonathan Wei 2017-11-01 14:01:13 -07:00 committed by GitHub
parent 654cdc07f5
commit 6840eabd87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1155 additions and 4 deletions

View File

@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import com.google.common.collect.Sets;
import io.druid.java.util.common.StringUtils;
import io.druid.server.router.ConsistentHasher;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class ConsistentHasherBenchmark
{
@Param({"100000"})
int numIds;
ConsistentHasher hasher;
List<String> uuids;
Set<String> servers;
@Setup
public void setup() throws IOException
{
hasher = new ConsistentHasher(null);
uuids = new ArrayList<>();
servers = Sets.newHashSet(
"localhost:1",
"localhost:2",
"localhost:3",
"localhost:4",
"localhost:5",
"localhost:6",
"localhost:7",
"localhost:8",
"localhost:9",
"localhost:10"
);
for (int i = 0; i < numIds; i++) {
UUID uuid = UUID.randomUUID();
uuids.add(uuid.toString());
}
hasher.updateKeys(servers);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void hash(Blackhole blackhole) throws Exception
{
for (String uuid : uuids) {
String server = hasher.findKey(StringUtils.toUtf8(uuid));
blackhole.consume(server);
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import com.google.common.collect.Sets;
import io.druid.java.util.common.StringUtils;
import io.druid.server.router.RendezvousHasher;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class RendezvousHasherBenchmark
{
@Param({"100000"})
int numIds;
RendezvousHasher hasher;
List<String> uuids;
Set<String> servers;
@Setup
public void setup() throws IOException
{
hasher = new RendezvousHasher();
uuids = new ArrayList<>();
servers = Sets.newHashSet(
"localhost:1",
"localhost:2",
"localhost:3",
"localhost:4",
"localhost:5",
"localhost:6",
"localhost:7",
"localhost:8",
"localhost:9",
"localhost:10"
);
for (int i = 0; i < numIds; i++) {
UUID uuid = UUID.randomUUID();
uuids.add(uuid.toString());
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void hash(Blackhole blackhole) throws Exception
{
for (String uuid : uuids) {
String server = hasher.chooseNode(servers, StringUtils.toUtf8(uuid));
blackhole.consume(server);
}
}
}

View File

@ -75,6 +75,7 @@ The router module uses several of the default modules in [Configuration](../conf
|`druid.router.coordinatorServiceName`|Any string.|The service discovery name of the coordinator.|druid/coordinator|
|`druid.router.pollPeriod`|Any ISO8601 duration.|How often to poll for new rules.|PT1M|
|`druid.router.strategies`|An ordered JSON array of objects.|All custom strategies to use for routing.|[{"type":"timeBoundary"},{"type":"priority"}]|
|`druid.router.avatica.balancer.type`|String representing an AvaticaConnectionBalancer name|Class to use for balancing Avatica queries across brokers|rendezvousHash|
Router Strategies
-----------------
@ -119,6 +120,40 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio
JavaScript-based functionality is disabled by default. Please refer to the Druid <a href="../development/javascript.html">JavaScript programming guide</a> for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
</div>
Avatica Query Balancing
--------------
All Avatica JDBC requests with a given connection ID must be routed to the same broker, since Druid brokers do not share connection state with each other.
To accomplish this, Druid provides two built-in balancers that use rendezvous hashing and consistent hashing of a request's connection ID respectively to assign requests to brokers.
Note that when multiple routers are used, all routers should have identical balancer configuration to ensure that they make the same routing decisions.
### Rendezvous Hash Balancer
This balancer uses [Rendezvous Hashing](https://en.wikipedia.org/wiki/Rendezvous_hashing) on an Avatica request's connection ID to assign the request to a broker.
To use this balancer, specify the following property:
```
druid.router.avatica.balancer.type=rendezvousHash
```
If no `druid.router.avatica.balancer` property is set, the Router will also default to using the Rendezvous Hash Balancer.
### Consistent Hash Balancer
This balancer uses [Consistent Hashing](https://en.wikipedia.org/wiki/Consistent_hashing) on an Avatica request's connection ID to assign the request to a broker.
To use this balancer, specify the following property:
```
druid.router.avatica.balancer.type=consistentHash
```
This is a non-default implementation that is provided for experimentation purposes. The consistent hasher has longer setup times on initialization and when the set of brokers changes, but has a faster broker assignment time than the rendezous hasher when tested with 5 brokers. Benchmarks for both implementations have been provided in `ConsistentHasherBenchmark` and `RendezvousHasherBenchmark`. The consistent hasher also requires locking, while the rendezvous hasher does not.
HTTP Endpoints
--------------

View File

@ -388,10 +388,16 @@ Table metadata is available over JDBC using `connection.getMetaData()` or by que
["INFORMATION_SCHEMA" tables](#retrieving-metadata). Parameterized queries (using `?` or other placeholders) don't work properly,
so avoid those.
#### Connection Stickiness
Druid's JDBC server does not share connection state between brokers. This means that if you're using JDBC and have
multiple Druid brokers, you should either connect to a specific broker, or use a load balancer with sticky sessions
enabled.
The Druid Router node provides connection stickiness when balancing JDBC requests. Please see [Router](../development/router.html) documentation for more details.
Note that the non-JDBC [JSON over HTTP](#json-over-http) API is stateless and does not require stickiness.
### Connection context
Druid SQL supports setting connection parameters on the client. The parameters in the table below affect SQL planning.

View File

@ -33,6 +33,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
@ -62,6 +63,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -78,6 +80,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host";
private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme";
private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery";
private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";
private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
@ -186,7 +189,17 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
final boolean isQueryEndpoint = request.getRequestURI().startsWith("/druid/v2")
&& !request.getRequestURI().startsWith("/druid/v2/sql");
if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) {
final boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica");
if (isAvatica) {
Map<String, Object> requestMap = objectMapper.readValue(request.getInputStream(), Map.class);
String connectionId = getAvaticaConnectionId(requestMap);
Server targetServer = hostFinder.findServerAvatica(connectionId);
byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
request.setAttribute(HOST_ATTRIBUTE, targetServer.getHost());
request.setAttribute(SCHEME_ATTRIBUTE, targetServer.getScheme());
request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
} else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) {
// query cancellation request
for (final Server server: hostFinder.getAllServers()) {
// send query cancellation to all brokers this query may have gone to
@ -263,6 +276,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
byte[] avaticaQuery = (byte[]) clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE);
if (avaticaQuery != null) {
proxyRequest.content(new BytesContentProvider(avaticaQuery));
}
final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE);
if (query != null) {
final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
@ -371,6 +389,18 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
return interruptedQueryCount.get();
}
private static String getAvaticaConnectionId(Map<String, Object> requestMap) throws IOException
{
Object connectionIdObj = requestMap.get("connectionId");
if (connectionIdObj == null) {
throw new IAE("Received an Avatica request without a connectionId.");
}
if (!(connectionIdObj instanceof String)) {
throw new IAE("Received an Avatica request with a non-String connectionId.");
}
return (String) connectionIdObj;
}
private class MetricsEmittingProxyResponseListener extends ProxyResponseListener
{

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.client.selector.Server;
import java.util.Collection;
/**
* An AvaticaConnectionBalancer balances Avatica connections across a collection of servers.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RendezvousHashAvaticaConnectionBalancer.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "rendezvousHash", value = RendezvousHashAvaticaConnectionBalancer.class),
@JsonSubTypes.Type(name = "consistentHash", value = ConsistentHashAvaticaConnectionBalancer.class)
})
public interface AvaticaConnectionBalancer
{
/**
* @param servers Servers to balance across
* @param connectionId Connection ID to be balanced
* @return Server that connectionId should be assigned to. The process for choosing a server must be deterministic and
* sticky (with a fixed set of servers, the same connectionId should always be assigned to the same server)
*/
Server pickServer(Collection<Server> servers, String connectionId);
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import io.druid.client.selector.Server;
import io.druid.java.util.common.StringUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class ConsistentHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer
{
private final ConsistentHasher hasher = new ConsistentHasher(null);
@Override
public Server pickServer(Collection<Server> servers, String connectionId)
{
synchronized (hasher) {
if (servers.isEmpty()) {
return null;
}
Map<String, Server> serverMap = new HashMap<>();
for (Server server : servers) {
serverMap.put(server.getHost(), server);
}
hasher.updateKeys(serverMap.keySet());
String chosenServer = hasher.findKey(StringUtils.toUtf8(connectionId));
return serverMap.get(chosenServer);
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Distributes objects across a set of node keys using consistent hashing.
* See https://en.wikipedia.org/wiki/Consistent_hashing
* Not thread-safe.
*/
public class ConsistentHasher
{
// Determined through tests to provide reasonably equal balancing on a test set of 5-10 brokers
private static final int REPLICATION_FACTOR = 128;
private static final HashFunction DEFAULT_HASH_FN = Hashing.murmur3_128();
private final Long2ObjectRBTreeMap<String> nodeKeySlots = new Long2ObjectRBTreeMap<>();
{
nodeKeySlots.defaultReturnValue(null);
}
private final HashFunction hashFn;
private final Map<String, long[]> nodeKeyHashes = new HashMap<>();
private Set<String> previousKeys = new HashSet<>();
public ConsistentHasher(
final HashFunction hashFunction
)
{
this.hashFn = hashFunction == null ? DEFAULT_HASH_FN : hashFunction;
}
public void updateKeys(Set<String> currentKeys)
{
Set<String> added = new HashSet<>(currentKeys);
added.removeAll(previousKeys);
Set<String> removed = new HashSet<>(previousKeys);
removed.removeAll(currentKeys);
for (String key : added) {
addKey(key);
}
for (String key : removed) {
removeKey(key);
}
// store a copy in case the input was immutable
previousKeys = new HashSet<>(currentKeys);
}
public String findKey(byte[] obj)
{
if (nodeKeySlots.size() == 0) {
return null;
}
long objHash = hashFn.hashBytes(obj).asLong();
Long2ObjectSortedMap<String> subMap = nodeKeySlots.tailMap(objHash);
if (subMap.isEmpty()) {
return nodeKeySlots.long2ObjectEntrySet().first().getValue();
}
Long2ObjectMap.Entry<String> firstEntry = subMap.long2ObjectEntrySet().first();
return firstEntry.getValue();
}
private void addKey(String key)
{
if (nodeKeyHashes.containsKey(key)) {
return;
}
addNodeKeyHashes(key);
addNodeKeySlots(key);
}
private void removeKey(String key)
{
if (!nodeKeyHashes.containsKey(key)) {
return;
}
removeNodeKeySlots(key);
removeNodeKeyHashes(key);
}
private void addNodeKeyHashes(String key)
{
long[] hashes = new long[REPLICATION_FACTOR];
for (int i = 0; i < REPLICATION_FACTOR; i++) {
String vnode = key + "-" + i;
hashes[i] = hashFn.hashString(vnode, Charsets.UTF_8).asLong();
}
nodeKeyHashes.put(key, hashes);
}
private void addNodeKeySlots(String key)
{
long[] hashes = nodeKeyHashes.get(key);
for (long hash : hashes) {
nodeKeySlots.put(hash, key);
}
}
private void removeNodeKeyHashes(String key)
{
nodeKeyHashes.remove(key);
}
private void removeNodeKeySlots(String key)
{
long[] hashes = nodeKeyHashes.get(key);
for (long hash : hashes) {
nodeKeySlots.remove(hash);
}
}
}

View File

@ -38,15 +38,18 @@ public class QueryHostFinder
private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class);
private final TieredBrokerHostSelector hostSelector;
private final AvaticaConnectionBalancer avaticaConnectionBalancer;
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<>();
@Inject
public QueryHostFinder(
TieredBrokerHostSelector hostSelector
TieredBrokerHostSelector hostSelector,
AvaticaConnectionBalancer avaticaConnectionBalancer
)
{
this.hostSelector = hostSelector;
this.avaticaConnectionBalancer = avaticaConnectionBalancer;
}
public <T> Server findServer(Query<T> query)
@ -68,6 +71,25 @@ public class QueryHostFinder
.collect(Collectors.toList());
}
public Server findServerAvatica(String connectionId)
{
Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
if (chosenServer == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId);
}
log.debug(
"Balancer class [%s] sending request with connectionId[%s] to server: %s",
avaticaConnectionBalancer.getClass(),
connectionId,
chosenServer.getHost()
);
return chosenServer;
}
public <T> Server getServer(Query<T> query)
{
Server server = findServer(query);

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import io.druid.client.selector.Server;
import io.druid.java.util.common.StringUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class RendezvousHashAvaticaConnectionBalancer implements AvaticaConnectionBalancer
{
private final RendezvousHasher hasher = new RendezvousHasher();
@Override
public Server pickServer(Collection<Server> servers, String connectionId)
{
if (servers.isEmpty()) {
return null;
}
Map<String, Server> serverMap = new HashMap<>();
for (Server server : servers) {
serverMap.put(server.getHost(), server);
}
String chosenServerId = hasher.chooseNode(serverMap.keySet(), StringUtils.toUtf8(connectionId));
return serverMap.get(chosenServerId);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.router;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.List;
import java.util.Set;
/**
* Distributes objects across a set of node keys using rendezvous hashing
* See https://en.wikipedia.org/wiki/Rendezvous_hashing
*/
public class RendezvousHasher
{
private static final HashFunction HASH_FN = Hashing.murmur3_128();
public <KeyType> String chooseNode(Set<String> nodeIds, byte[] key)
{
if (nodeIds.isEmpty()) {
return null;
}
final HashCode keyHash = HASH_FN.hashBytes(key);
long maxHash = Long.MIN_VALUE;
String maxNode = null;
for (String nodeId : nodeIds) {
HashCode nodeHash = HASH_FN.hashString(nodeId, Charsets.UTF_8);
List<HashCode> hashes = Lists.newArrayList(nodeHash, keyHash);
long combinedHash = Hashing.combineOrdered(hashes).asLong();
if (maxNode == null) {
maxHash = combinedHash;
maxNode = nodeId;
} else if (combinedHash > maxHash) {
maxHash = combinedHash;
maxNode = nodeId;
}
}
return maxNode;
}
}

View File

@ -50,6 +50,7 @@ import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.RendezvousHashAvaticaConnectionBalancer;
import io.druid.server.security.AllowAllAuthorizer;
import io.druid.server.security.AuthTestUtils;
import io.druid.server.security.Authorizer;
@ -208,7 +209,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
final QueryHostFinder hostFinder = new QueryHostFinder(null)
final QueryHostFinder hostFinder = new QueryHostFinder(null, new RendezvousHashAvaticaConnectionBalancer())
{
@Override
public io.druid.client.selector.Server getServer(Query query)

View File

@ -0,0 +1,263 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.router.ConsistentHasher;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class ConsistentHasherTest
{
private static HashFunction TEST_HASH_FN = Hashing.murmur3_128();
private static int NUM_ITERATIONS = 10000;
private static final Logger log = new Logger(ConsistentHasherTest.class);
@Test
public void testBasic() throws Exception
{
ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN);
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Map<String, String> uuidServerMap = new HashMap<>();
hasher.updateKeys(nodes);
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
// check that the same UUIDs hash to the same servers on subsequent hashStr() calls
for (int i = 0; i < 2; i++) {
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey()));
Assert.assertEquals(entry.getValue(), targetServer);
}
}
}
@Test
public void testAddNode() throws Exception
{
ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN);
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
hasher.updateKeys(nodes);
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
nodes.add("localhost:6");
hasher.updateKeys(nodes);
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey()));
if (entry.getValue().equals(targetServer)) {
same += 1;
} else {
diff += 1;
}
}
log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff));
// ~1/6 of the entries should change, check that less than 1/5 of the entries hash differently
double diffRatio = ((double) diff) / NUM_ITERATIONS;
Assert.assertTrue(diffRatio < 0.20);
}
@Test
public void testRemoveNode() throws Exception
{
ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN);
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
hasher.updateKeys(nodes);
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
nodes.remove("localhost:3");
hasher.updateKeys(nodes);
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.findKey(StringUtils.toUtf8(entry.getKey()));
if (entry.getValue().equals(targetServer)) {
same += 1;
} else {
diff += 1;
}
}
log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff));
// ~1/5 of the entries should change, check that less than 1/4 of the entries hash differently
double diffRatio = ((double) diff) / NUM_ITERATIONS;
Assert.assertTrue(diffRatio < 0.25);
}
@Test
public void testInconsistentView1() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:3");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33);
}
@Test
public void testInconsistentView2() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:2");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.50);
}
@Test
public void testInconsistentView3() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66);
}
@Test
public void testInconsistentView4() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:2");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95);
}
public void testInconsistentViewHelper(
String testName,
Set<String> nodes,
Set<String> nodes2,
double expectedDiffRatio
) throws Exception
{
ConsistentHasher hasher = new ConsistentHasher(TEST_HASH_FN);
hasher.updateKeys(nodes);
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.findKey(StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
ConsistentHasher hasher2 = new ConsistentHasher(TEST_HASH_FN);
hasher2.updateKeys(nodes2);
Map<String, String> uuidServerMap2 = new HashMap<>();
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher2.findKey(StringUtils.toUtf8(entry.getKey()));
uuidServerMap2.put(entry.getKey(), targetServer);
}
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String otherServer = uuidServerMap2.get(entry.getKey());
if (entry.getValue().equals(otherServer)) {
same += 1;
} else {
diff += 1;
}
}
double actualDiffRatio = ((double) diff) / NUM_ITERATIONS;
log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff));
log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio);
Assert.assertTrue(actualDiffRatio <= expectedDiffRatio);
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.router.RendezvousHasher;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class RendezvousHasherTest
{
private static int NUM_ITERATIONS = 10000;
private static final Logger log = new Logger(RendezvousHasherTest.class);
@Test
public void testBasic() throws Exception
{
RendezvousHasher hasher = new RendezvousHasher();
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
// check that the same UUIDs hash to the same servers on subsequent hashStr() calls
for (int i = 0; i < 2; i++) {
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey()));
Assert.assertEquals(entry.getValue(), targetServer);
}
}
}
@Test
public void testAddNode() throws Exception
{
RendezvousHasher hasher = new RendezvousHasher();
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
nodes.add("localhost:6");
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey()));
if (entry.getValue().equals(targetServer)) {
same += 1;
} else {
diff += 1;
}
}
log.info(StringUtils.format("testAddNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff));
double diffRatio = ((double) diff) / NUM_ITERATIONS;
Assert.assertTrue(diffRatio < 0.33);
}
@Test
public void testRemoveNode() throws Exception
{
RendezvousHasher hasher = new RendezvousHasher();
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
nodes.remove("localhost:3");
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(entry.getKey()));
if (entry.getValue().equals(targetServer)) {
same += 1;
} else {
diff += 1;
}
}
log.info(StringUtils.format("testRemoveNode Total: %s, Same: %s, Diff: %s", NUM_ITERATIONS, same, diff));
double diffRatio = ((double) diff) / NUM_ITERATIONS;
Assert.assertTrue(diffRatio < 0.33);
}
@Test
public void testInconsistentView1() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:2");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:3");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView1", nodes, nodes2, 0.33);
}
@Test
public void testInconsistentView2() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:1");
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:2");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView2", nodes, nodes2, 0.55);
}
@Test
public void testInconsistentView3() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:3");
nodes.add("localhost:4");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView3", nodes, nodes2, 0.66);
}
@Test
public void testInconsistentView4() throws Exception
{
Set<String> nodes = new HashSet<>();
nodes.add("localhost:2");
nodes.add("localhost:5");
Set<String> nodes2 = new HashSet<>();
nodes2.add("localhost:1");
nodes2.add("localhost:4");
nodes2.add("localhost:5");
testInconsistentViewHelper("testInconsistentView4", nodes, nodes2, 0.95);
}
public void testInconsistentViewHelper(
String testName,
Set<String> nodes,
Set<String> nodes2,
double expectedDiffRatio
) throws Exception
{
RendezvousHasher hasher = new RendezvousHasher();
Map<String, String> uuidServerMap = new HashMap<>();
for (int i = 0; i < NUM_ITERATIONS; i++) {
UUID objectId = UUID.randomUUID();
String targetServer = hasher.chooseNode(nodes, StringUtils.toUtf8(objectId.toString()));
uuidServerMap.put(objectId.toString(), targetServer);
}
RendezvousHasher hasher2 = new RendezvousHasher();
Map<String, String> uuidServerMap2 = new HashMap<>();
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String targetServer = hasher2.chooseNode(nodes2, StringUtils.toUtf8(entry.getKey()));
uuidServerMap2.put(entry.getKey(), targetServer);
}
int same = 0;
int diff = 0;
for (Map.Entry<String, String> entry : uuidServerMap.entrySet()) {
String otherServer = uuidServerMap2.get(entry.getKey());
if (entry.getValue().equals(otherServer)) {
same += 1;
} else {
diff += 1;
}
}
double actualDiffRatio = ((double) diff) / NUM_ITERATIONS;
log.info(StringUtils.format("%s Total: %s, Same: %s, Diff: %s", testName, NUM_ITERATIONS, same, diff));
log.info("Expected diff ratio: %s, Actual diff ratio: %s", expectedDiffRatio, actualDiffRatio);
Assert.assertTrue(actualDiffRatio <= expectedDiffRatio);
}
}

View File

@ -90,7 +90,8 @@ public class QueryHostFinderTest
public void testFindServer() throws Exception
{
QueryHostFinder queryRunner = new QueryHostFinder(
brokerSelector
brokerSelector,
new RendezvousHashAvaticaConnectionBalancer()
);
Server server = queryRunner.findServer(

View File

@ -50,6 +50,7 @@ import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.http.RouterResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.router.AvaticaConnectionBalancer;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
@ -94,6 +95,7 @@ public class CliRouter extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9088);
JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", AvaticaConnectionBalancer.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);