Prefer adaptive replica selection over awareness attribute based routing (#1107)

* Prefer adaptive replica selection over awareness attribute based routing

Signed-off-by: jyothsna <donapv@amazon.com>

* Minor changes

Signed-off-by: jyothsna <donapv@amazon.com>

* Address review comments

Signed-off-by: jyothsna <donapv@amazon.com>

* Fix checkstyle

Signed-off-by: jyothsna <donapv@amazon.com>

* Minor fix

Signed-off-by: jyothsna <donapv@amazon.com>
This commit is contained in:
Venkata Jyothsna Donapati 2022-01-21 09:11:49 -08:00 committed by GitHub
parent 781156471a
commit 02c6a7f156
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 271 additions and 102 deletions

View File

@ -1,63 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.cluster.routing;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import static org.hamcrest.Matchers.equalTo;
public class EvilSystemPropertyTests extends OpenSearchTestCase {
@SuppressForbidden(reason = "manipulates system properties for testing")
public void testDisableSearchAllocationAwareness() {
Settings indexSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "test")
.build();
OperationRouting routing = new OperationRouting(indexSettings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
assertWarnings(OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE);
assertThat(routing.getAwarenessAttributes().size(), equalTo(1));
assertThat(routing.getAwarenessAttributes().get(0), equalTo("test"));
System.setProperty("opensearch.search.ignore_awareness_attributes", "true");
try {
routing = new OperationRouting(indexSettings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
assertTrue(routing.getAwarenessAttributes().isEmpty());
} finally {
System.clearProperty("opensearch.search.ignore_awareness_attributes");
}
}
}

View File

@ -38,7 +38,6 @@ import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
@ -55,8 +54,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.opensearch.common.Booleans.parseBoolean;
public class OperationRouting {
public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING = Setting.boolSetting(
@ -66,53 +63,52 @@ public class OperationRouting {
Setting.Property.NodeScope
);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(OperationRouting.class);
private static final String IGNORE_AWARENESS_ATTRIBUTES_PROPERTY = "opensearch.search.ignore_awareness_attributes";
static final String IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE =
"searches will not be routed based on awareness attributes starting in version 8.0.0; "
+ "to opt into this behaviour now please set the system property ["
+ IGNORE_AWARENESS_ATTRIBUTES_PROPERTY
+ "] to [true]";
private List<String> awarenessAttributes;
private boolean useAdaptiveReplicaSelection;
public static final String IGNORE_AWARENESS_ATTRIBUTES = "cluster.search.ignore_awareness_attributes";
public static final Setting<Boolean> IGNORE_AWARENESS_ATTRIBUTES_SETTING = Setting.boolSetting(
IGNORE_AWARENESS_ATTRIBUTES,
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
boolean ignoreAwarenessAttr = parseBoolean(System.getProperty(IGNORE_AWARENESS_ATTRIBUTES_PROPERTY), false);
if (ignoreAwarenessAttr == false) {
awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
if (awarenessAttributes.isEmpty() == false) {
deprecationLogger.deprecate("searches_not_routed_on_awareness_attributes", IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE);
}
this.ignoreAwarenessAttr = clusterSettings.get(IGNORE_AWARENESS_ATTRIBUTES_SETTING);
this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
this::setAwarenessAttributes
);
} else {
awarenessAttributes = Collections.emptyList();
}
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
}
void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
}
void setIgnoreAwarenessAttributes(boolean ignoreAwarenessAttributes) {
this.ignoreAwarenessAttr = ignoreAwarenessAttributes;
}
public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
List<String> getAwarenessAttributes() {
return awarenessAttributes;
}
private void setAwarenessAttributes(List<String> awarenessAttributes) {
boolean ignoreAwarenessAttr = parseBoolean(System.getProperty(IGNORE_AWARENESS_ATTRIBUTES_PROPERTY), false);
if (ignoreAwarenessAttr == false) {
if (this.awarenessAttributes.isEmpty() && awarenessAttributes.isEmpty() == false) {
deprecationLogger.deprecate("searches_not_routed_on_awareness_attributes", IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE);
}
this.awarenessAttributes = awarenessAttributes;
}
public boolean ignoreAwarenessAttributes() {
return this.awarenessAttributes.isEmpty() || this.ignoreAwarenessAttr;
}
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
@ -286,8 +282,7 @@ public class OperationRouting {
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
if (awarenessAttributes.isEmpty()) {
if (ignoreAwarenessAttributes()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
@ -300,7 +295,7 @@ public class OperationRouting {
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
if (awarenessAttributes.isEmpty()) {
if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {

View File

@ -519,6 +519,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,

View File

@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.cluster.routing;
import org.junit.After;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.cluster.routing.OperationRouting.IGNORE_AWARENESS_ATTRIBUTES;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
public class OperationRoutingAwarenessTests extends OpenSearchIntegTestCase {
@After
public void cleanup() {
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putNull("*")));
}
public void testToggleSearchAllocationAwareness() {
OperationRouting routing = internalCluster().clusterService().operationRouting();
// Update awareness settings
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone"))
.get();
assertThat(routing.getAwarenessAttributes().size(), equalTo(1));
assertThat(routing.getAwarenessAttributes().get(0), equalTo("zone"));
assertTrue(internalCluster().clusterService().operationRouting().ignoreAwarenessAttributes());
// Unset ignore awareness attributes
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(IGNORE_AWARENESS_ATTRIBUTES, false))
.get();
// assert that awareness attributes hasn't changed
assertThat(routing.getAwarenessAttributes().size(), equalTo(1));
assertThat(routing.getAwarenessAttributes().get(0), equalTo("zone"));
assertFalse(internalCluster().clusterService().operationRouting().isIgnoreAwarenessAttr());
assertFalse(internalCluster().clusterService().operationRouting().ignoreAwarenessAttributes());
// Set ignore awareness attributes to true
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(IGNORE_AWARENESS_ATTRIBUTES, true))
.get();
// assert that awareness attributes hasn't changed
assertThat(routing.getAwarenessAttributes().size(), equalTo(1));
assertThat(routing.getAwarenessAttributes().get(0), equalTo("zone"));
assertTrue(routing.isIgnoreAwarenessAttr());
assertTrue(internalCluster().clusterService().operationRouting().ignoreAwarenessAttributes());
}
}

View File

@ -35,6 +35,9 @@ import org.opensearch.Version;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
@ -51,6 +54,7 @@ import org.opensearch.threadpool.TestThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -58,12 +62,16 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.opensearch.cluster.routing.OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE;
import static java.util.Collections.singletonMap;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.object.HasToString.hasToString;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
public class OperationRoutingTests extends OpenSearchTestCase {
public void testGenerateShardId() {
@ -658,6 +666,125 @@ public class OperationRoutingTests extends OpenSearchTestCase {
terminate(threadPool);
}
// Regression test to ignore awareness attributes. This test creates shards in different zones and simulates stress
// on nodes in one zone to test if Adapative Replica Selection smartly routes the request to a node in different zone
// by ignoring the zone awareness attributes.
public void testAdaptiveReplicaSelectionWithZoneAwarenessIgnored() throws Exception {
final int numIndices = 2;
final int numShards = 1;
final int numReplicas = 1;
final String[] indexNames = new String[numIndices];
for (int i = 0; i < numIndices; i++) {
indexNames[i] = "test" + i;
}
DiscoveryNode[] allNodes = setupNodes();
ClusterState state = ClusterStateCreationUtils.state(allNodes[0], allNodes[3], allNodes);
// Updates cluster state by assigning shard copies on nodes
state = updateStatetoTestARS(indexNames, numShards, numReplicas, allNodes, state);
Settings awarenessSetting = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build();
TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupport");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
OperationRouting opRouting = new OperationRouting(
awarenessSetting,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
opRouting.setUseAdaptiveReplicaSelection(true);
assertTrue(opRouting.ignoreAwarenessAttributes());
List<ShardRouting> searchedShards = new ArrayList<>(numShards);
Set<String> selectedNodes = new HashSet<>(numShards);
ResponseCollectorService collector = new ResponseCollectorService(clusterService);
Map<String, Long> outstandingRequests = new HashMap<>();
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(
state,
indexNames,
null,
null,
collector,
outstandingRequests
);
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));
// Test that the shards use a round-robin pattern when there are no stats
assertThat(groupIterator.size(), equalTo(numIndices * numShards));
assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1));
ShardRouting firstChoice = groupIterator.get(0).nextOrNull();
assertNotNull(firstChoice);
searchedShards.add(firstChoice);
selectedNodes.add(firstChoice.currentNodeId());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
assertThat(groupIterator.size(), equalTo(numIndices * numShards));
assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1));
ShardRouting secondChoice = groupIterator.get(0).nextOrNull();
assertNotNull(secondChoice);
searchedShards.add(secondChoice);
selectedNodes.add(secondChoice.currentNodeId());
// All the shards should be ranked equally since there are no stats yet
assertTrue(selectedNodes.contains("node_b2"));
// Since the primary shards are divided randomly between node_a0 and node_a1
assertTrue(selectedNodes.contains("node_a0") || selectedNodes.contains("node_a1"));
// Now let's start adding node metrics, since that will affect which node is chosen. Adding more load to node_b2
collector.addNodeStatistics("node_a0", 1, TimeValue.timeValueMillis(50).nanos(), TimeValue.timeValueMillis(50).nanos());
collector.addNodeStatistics("node_a1", 20, TimeValue.timeValueMillis(100).nanos(), TimeValue.timeValueMillis(150).nanos());
collector.addNodeStatistics("node_b2", 40, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(250).nanos());
outstandingRequests.put("node_a0", 1L);
outstandingRequests.put("node_a1", 1L);
outstandingRequests.put("node_b2", 1L);
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
// node_a0 or node_a1 should be the lowest ranked node to start
groupIterator.forEach(shardRoutings -> assertThat(shardRoutings.nextOrNull().currentNodeId(), containsString("node_a")));
// Adding more load to node_a0
collector.addNodeStatistics("node_a0", 10, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(150).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
// Adding more load to node_a0 and node_a1 from zone-a
collector.addNodeStatistics("node_a1", 100, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos());
collector.addNodeStatistics("node_a0", 100, TimeValue.timeValueMillis(300).nanos(), TimeValue.timeValueMillis(250).nanos());
groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests);
// ARS should pick node_b2 from zone-b since both node_a0 and node_a1 are overloaded
groupIterator.forEach(shardRoutings -> assertThat(shardRoutings.nextOrNull().currentNodeId(), containsString("node_b")));
IOUtils.close(clusterService);
terminate(threadPool);
}
private DiscoveryNode[] setupNodes() {
// Sets up two data nodes in zone-a and one data node in zone-b
List<String> zones = Arrays.asList("a", "a", "b");
DiscoveryNode[] allNodes = new DiscoveryNode[4];
int i = 0;
for (String zone : zones) {
DiscoveryNode node = new DiscoveryNode(
"node_" + zone + i,
buildNewFakeTransportAddress(),
singletonMap("zone", zone),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
);
allNodes[i++] = node;
}
DiscoveryNode master = new DiscoveryNode(
"master",
buildNewFakeTransportAddress(),
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
Version.CURRENT
);
allNodes[i] = master;
return allNodes;
}
public void testAllocationAwarenessDeprecation() {
OperationRouting routing = new OperationRouting(
Settings.builder()
@ -665,7 +792,53 @@ public class OperationRoutingTests extends OpenSearchTestCase {
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
assertWarnings(IGNORE_AWARENESS_ATTRIBUTES_DEPRECATION_MESSAGE);
}
/**
* The following setup is created to test ARS
*/
private ClusterState updateStatetoTestARS(
String[] indices,
int numberOfShards,
int numberOfReplicas,
DiscoveryNode[] nodes,
ClusterState state
) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Metadata.Builder metadataBuilder = Metadata.builder();
ClusterState.Builder clusterState = ClusterState.builder(state);
for (String index : indices) {
IndexMetadata indexMetadata = IndexMetadata.builder(index)
.settings(
Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
)
.build();
metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
for (int i = 0; i < numberOfShards; i++) {
final ShardId shardId = new ShardId(index, "_na_", i);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
// Assign all the primary shards on nodes in zone-a (node_a0 or node_a1)
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(index, i, nodes[randomInt(1)].getId(), null, true, ShardRoutingState.STARTED)
);
for (int replica = 0; replica < numberOfReplicas; replica++) {
// Assign all the replicas on nodes in zone-b (node_b2)
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(index, i, nodes[2].getId(), null, false, ShardRoutingState.STARTED)
);
}
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
}
routingTableBuilder.add(indexRoutingTableBuilder.build());
}
clusterState.metadata(metadataBuilder);
clusterState.routingTable(routingTableBuilder.build());
return clusterState.build();
}
}