Makes the same_shard host dyanamically updatable (#23397)

Previously, cluster.routing.allocation.same_shard.host was not a dynamic
setting and could not be updated after startup.  This commit changes the
behavior to allow the setting to be dynamically updatable.  The
documentation already states that the setting is dynamic so no
documentation changes are required.

Closes #22992
This commit is contained in:
Ali Beyad 2017-02-28 12:48:54 -05:00 committed by GitHub
parent 5be7f6a76f
commit 5e2e45cad9
10 changed files with 128 additions and 31 deletions

View File

@ -172,7 +172,7 @@ public class ClusterModule extends AbstractModule {
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings));
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));

View File

@ -23,7 +23,9 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
/**
@ -46,14 +48,23 @@ public class SameShardAllocationDecider extends AllocationDecider {
public static final String NAME = "same_shard";
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING =
Setting.boolSetting("cluster.routing.allocation.same_shard.host", false, Setting.Property.NodeScope);
Setting.boolSetting("cluster.routing.allocation.same_shard.host", false, Property.Dynamic, Property.NodeScope);
private final boolean sameHost;
private volatile boolean sameHost;
public SameShardAllocationDecider(Settings settings) {
public SameShardAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
super(settings);
this.sameHost = CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, this::setSameHost);
}
/**
* Sets the same host setting. {@code true} if allocating the same shard copy to the same host
* should not be allowed, even when multiple nodes are being run on the same host. {@code false}
* otherwise.
*/
private void setSameHost(boolean sameHost) {
this.sameHost = sameHost;
}
@Override

View File

@ -56,11 +56,12 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
public class FilterAllocationDeciderTests extends ESAllocationTestCase {
public void testFilterInitialRecovery() {
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(Settings.EMPTY, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY,
Arrays.asList(filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY)));
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY)));
AllocationService service = new AllocationService(Settings.builder().build(), allocationDeciders,
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
ClusterState state = createInitialClusterState(service, Settings.builder().put("index.routing.allocation.initial_recovery._id",

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.hamcrest.Matchers;
@ -58,8 +59,9 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
public void testRandomDecisions() {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(random());
AllocationService strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
randomAllocationDecider))), new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
randomAllocationDecider))), new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = MetaData.builder();
int maxNumReplicas = 1;

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@ -98,7 +99,8 @@ public class SameShardRoutingTests extends ESAllocationTestCase {
}
public void testForceAllocatePrimaryOnSameNodeNotAllowed() {
SameShardAllocationDecider decider = new SameShardAllocationDecider(Settings.EMPTY);
SameShardAllocationDecider decider = new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
ClusterState clusterState = ClusterStateCreationUtils.state("idx", randomIntBetween(2, 4), 1);
Index index = clusterState.getMetaData().index("idx").getIndex();
ShardRouting primaryShard = clusterState.routingTable().index(index).shard(0).primaryShard();
@ -123,4 +125,33 @@ public class SameShardRoutingTests extends ESAllocationTestCase {
decision = decider.canForceAllocatePrimary(newPrimary, unassignedNode, routingAllocation);
assertEquals(Decision.Type.YES, decision.type());
}
public void testUpdateSameHostSetting() {
AllocationService strategy = createAllocationService(
Settings.builder().put(SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey(), true).build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData)
.routingTable(routingTable).build();
logger.info("--> adding two nodes with the same host");
clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", "node1", "node1", "test1", "test1", buildNewFakeTransportAddress(), emptyMap(),
MASTER_DATA_ROLES, Version.CURRENT))
.add(new DiscoveryNode("node2", "node2", "node2", "test1", "test1", buildNewFakeTransportAddress(), emptyMap(),
MASTER_DATA_ROLES, Version.CURRENT))).build();
clusterState = strategy.reroute(clusterState, "reroute");
assertThat(numberOfShardsOfType(clusterState.getRoutingNodes(), ShardRoutingState.INITIALIZING), equalTo(2));
logger.info("--> start all primary shards, no replica will be started since its on the same host");
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
}
}

View File

@ -92,9 +92,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@ -187,7 +188,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
strategy = new AllocationService(Settings.builder()
@ -217,7 +218,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
strategy = new AllocationService(Settings.builder()
@ -279,9 +280,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@ -414,7 +416,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
strategy = new AllocationService(Settings.builder()
@ -444,7 +446,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
strategy = new AllocationService(Settings.builder()
@ -535,7 +537,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@ -605,7 +609,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY),
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@ -710,7 +716,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
DiskThresholdDecider decider = makeDecider(diskSettings);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), decider)));
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
), decider)));
ClusterInfoService cis = new ClusterInfoService() {
@Override
@ -913,7 +921,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
}
};
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
diskThresholdDecider
)));
AllocationService strategy = new AllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
@ -1011,7 +1022,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
};
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(Settings.EMPTY), diskThresholdDecider
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
diskThresholdDecider
)));
AllocationService strategy = new AllocationService(Settings.builder()

View File

@ -18,23 +18,28 @@
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Set;
import static org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
* Simple integration for {@link EnableAllocationDecider} there is a more exhaustive unittest in
* {@link EnableAllocationTests} this test is meant to check if the actual update of the settings
* works as expected.
* An integration test for testing updating shard allocation/routing settings and
* ensuring the updated settings take effect as expected.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class EnableAllocationDeciderIT extends ESIntegTestCase {
public class UpdateShardAllocationSettingsIT extends ESIntegTestCase {
/**
* Tests that updating the {@link EnableAllocationDecider} related settings works as expected.
*/
public void testEnableRebalance() throws InterruptedException {
final String firstNode = internalCluster().startNode();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)).get();
@ -74,4 +79,32 @@ public class EnableAllocationDeciderIT extends ESIntegTestCase {
test = assertAllShardsOnNodes("test", firstNode, secondNode);
assertThat("index: [test] expected to be rebalanced on both nodes", test.size(), equalTo(2));
}
/**
* Tests that updating the {@link SameShardAllocationDecider#CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING} setting works as expected.
*/
public void testUpdateSameHostSetting() {
internalCluster().startNodes(2);
// same same_host to true, since 2 nodes are started on the same host,
// only primaries should be assigned
client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey(), true)
).get();
final String indexName = "idx";
client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
).get();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertFalse("replica should be unassigned",
clusterState.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).isEmpty());
// now, update the same_host setting to allow shards to be allocated to multiple nodes on
// the same host - the replica should get assigned
client().admin().cluster().prepareUpdateSettings().setTransientSettings(
Settings.builder().put(CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey(), false)
).get();
clusterState = client().admin().cluster().prepareState().get().getState();
assertTrue("all shards should be assigned",
clusterState.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).isEmpty());
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
@ -209,7 +210,9 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
*/
public void testThrottleWhenAllocatingToMatchingNode() {
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(new AllocationDeciders(Settings.EMPTY,
Arrays.asList(new TestAllocateDecision(Decision.YES), new SameShardAllocationDecider(Settings.EMPTY),
Arrays.asList(new TestAllocateDecision(Decision.YES),
new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
new AllocationDecider(Settings.EMPTY) {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {

View File

@ -117,15 +117,15 @@ public class ClusterStateChanges extends AbstractComponent {
public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) {
super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build());
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
allocationService = new AllocationService(settings, new AllocationDeciders(settings,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings),
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider(settings),
new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))),
new TestGatewayAllocator(), new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE);
shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ActionFilters actionFilters = new ActionFilters(Collections.emptySet());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings);

View File

@ -129,7 +129,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
protected static AllocationDeciders yesAllocationDeciders() {
return new AllocationDeciders(Settings.EMPTY, Arrays.asList(
new TestAllocateDecision(Decision.YES),
new SameShardAllocationDecider(Settings.EMPTY)));
new SameShardAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
}
protected static AllocationDeciders noAllocationDeciders() {
@ -139,7 +140,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
protected static AllocationDeciders throttleAllocationDeciders() {
return new AllocationDeciders(Settings.EMPTY, Arrays.asList(
new TestAllocateDecision(Decision.THROTTLE),
new SameShardAllocationDecider(Settings.EMPTY)));
new SameShardAllocationDecider(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
}
protected ClusterState applyStartedShardsUntilNoChange(ClusterState clusterState, AllocationService service) {