Search: Fairer balancing when routing searches by session ID (#24671)
A user reported uneven balancing of load on nodes handling search requests from Kibana which supplies a session ID in a routing preference. Each shardId was selecting the same node for a given session ID because one data node had all primaries and the other data node held all replicas after cluster startup. This change counteracts the tendency to opt for the same node given the same user-supplied preference by incorporating shard ID in the hash of the preference key. This will help randomise node choices across shards. Closes #24642
This commit is contained in:
parent
34093735e3
commit
c71ae3519f
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -177,10 +178,20 @@ public class OperationRouting extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
// if not, then use it as the index
|
||||
int routingHash = Murmur3HashFunction.hash(preference);
|
||||
if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
|
||||
// The AllocationService lists shards in a fixed order based on nodes
|
||||
// so earlier versions of this class would have a tendency to
|
||||
// select the same node across different shardIds.
|
||||
// Better overall balancing can be achieved if each shardId opts
|
||||
// for a different element in the list by also incorporating the
|
||||
// shard ID into the hash of the user-supplied preference key.
|
||||
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
|
||||
}
|
||||
if (awarenessAttributes.length == 0) {
|
||||
return indexShard.activeInitializingShardsIt(Murmur3HashFunction.hash(preference));
|
||||
return indexShard.activeInitializingShardsIt(routingHash);
|
||||
} else {
|
||||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, Murmur3HashFunction.hash(preference));
|
||||
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable.Builder;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
|
@ -271,6 +272,51 @@ public class ClusterStateCreationUtils {
|
|||
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
|
||||
return state.build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
|
||||
*/
|
||||
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {
|
||||
|
||||
int numberOfDataNodes = numberOfReplicas + 1;
|
||||
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < numberOfDataNodes + 1; i++) {
|
||||
final DiscoveryNode node = newNode(i);
|
||||
discoBuilder = discoBuilder.add(node);
|
||||
}
|
||||
discoBuilder.localNodeId(newNode(0).getId());
|
||||
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
|
||||
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
|
||||
state.nodes(discoBuilder);
|
||||
Builder routingTableBuilder = RoutingTable.builder();
|
||||
|
||||
org.elasticsearch.cluster.metadata.MetaData.Builder metadataBuilder = MetaData.builder();
|
||||
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(index)
|
||||
.settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT).put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).put(SETTING_CREATION_DATE, System.currentTimeMillis()))
|
||||
.build();
|
||||
metadataBuilder.put(indexMetaData, false).generateClusterUuidIfNeeded();
|
||||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
final ShardId shardId = new ShardId(index, "_na_", i);
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
|
||||
indexShardRoutingBuilder
|
||||
.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).getId(), null, true, ShardRoutingState.STARTED));
|
||||
for (int replica = 0; replica < numberOfReplicas; replica++) {
|
||||
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false,
|
||||
ShardRoutingState.STARTED));
|
||||
}
|
||||
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
|
||||
}
|
||||
routingTableBuilder.add(indexRoutingTableBuilder.build());
|
||||
}
|
||||
state.metaData(metadataBuilder);
|
||||
state.routingTable(routingTableBuilder.build());
|
||||
return state.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
|
||||
|
|
|
@ -21,8 +21,11 @@ package org.elasticsearch.cluster.routing;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -44,6 +47,7 @@ import java.util.TreeMap;
|
|||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.object.HasToString.hasToString;
|
||||
|
||||
public class OperationRoutingTests extends ESTestCase{
|
||||
|
@ -367,6 +371,65 @@ public class OperationRoutingTests extends ESTestCase{
|
|||
terminate(threadPool);
|
||||
}
|
||||
}
|
||||
|
||||
public void testFairSessionIdPreferences() throws InterruptedException, IOException {
|
||||
// Ensure that a user session is re-routed back to same nodes for
|
||||
// subsequent searches and that the nodes are selected fairly i.e.
|
||||
// given identically sorted lists of nodes across all shard IDs
|
||||
// each shard ID doesn't pick the same node.
|
||||
final int numIndices = randomIntBetween(1, 3);
|
||||
final int numShards = randomIntBetween(2, 10);
|
||||
final int numReplicas = randomIntBetween(1, 3);
|
||||
final String[] indexNames = new String[numIndices];
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
indexNames[i] = "test" + i;
|
||||
}
|
||||
ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas);
|
||||
final int numRepeatedSearches = 4;
|
||||
List<ShardRouting> sessionsfirstSearch = null;
|
||||
OperationRouting opRouting = new OperationRouting(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
String sessionKey = randomAlphaOfLength(10);
|
||||
for (int i = 0; i < numRepeatedSearches; i++) {
|
||||
List<ShardRouting> searchedShards = new ArrayList<>(numShards);
|
||||
Set<String> selectedNodes = new HashSet<>(numShards);
|
||||
final GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, sessionKey);
|
||||
|
||||
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));
|
||||
for (ShardIterator shardIterator : groupIterator) {
|
||||
assertThat(shardIterator.size(), equalTo(numReplicas + 1));
|
||||
|
||||
ShardRouting firstChoice = shardIterator.nextOrNull();
|
||||
assertNotNull(firstChoice);
|
||||
ShardRouting duelFirst = duelGetShards(state, firstChoice.shardId(), sessionKey).nextOrNull();
|
||||
assertThat("Regression test failure", duelFirst, equalTo(firstChoice));
|
||||
|
||||
searchedShards.add(firstChoice);
|
||||
selectedNodes.add(firstChoice.currentNodeId());
|
||||
}
|
||||
if (sessionsfirstSearch == null) {
|
||||
sessionsfirstSearch = searchedShards;
|
||||
} else {
|
||||
assertThat("Sessions must reuse same replica choices", searchedShards, equalTo(sessionsfirstSearch));
|
||||
}
|
||||
|
||||
// 2 is the bare minimum number of nodes we can reliably expect from
|
||||
// randomized tests in my experiments over thousands of iterations.
|
||||
// Ideally we would test for greater levels of machine utilisation
|
||||
// given a configuration with many nodes but the nature of hash
|
||||
// collisions means we can't always rely on optimal node usage in
|
||||
// all cases.
|
||||
assertThat("Search should use more than one of the nodes", selectedNodes.size(), greaterThan(1));
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test for the routing logic - implements same hashing logic
|
||||
private ShardIterator duelGetShards(ClusterState clusterState, ShardId shardId, String sessionId) {
|
||||
final IndexShardRoutingTable indexShard = clusterState.getRoutingTable().shardRoutingTable(shardId.getIndexName(), shardId.getId());
|
||||
int routingHash = Murmur3HashFunction.hash(sessionId);
|
||||
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
|
||||
return indexShard.activeInitializingShardsIt(routingHash);
|
||||
}
|
||||
|
||||
public void testThatOnlyNodesSupportNodeIds() throws InterruptedException, IOException {
|
||||
TestThreadPool threadPool = null;
|
||||
|
|
Loading…
Reference in New Issue