mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Allow to disable allocation on the index level
Similar to the global cluster wide disable allocation flags, allow to set those on a specific index by updating its settings. The keys are the same as the cluster one, except they start with an index, for example: index.routing.allocation.disable_allocation set to true. closes #3031
This commit is contained in:
parent
7b437e801a
commit
ae6c1b345f
src
main/java/org/elasticsearch
cluster/routing/allocation/decider
index/settings
test/java/org/elasticsearch/test
integration
unit/cluster
routing/allocation
settings
@ -57,6 +57,10 @@ public class DisableAllocationDecider extends AllocationDecider {
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION = "cluster.routing.allocation.disable_allocation";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION = "cluster.routing.allocation.disable_replica_allocation";
|
||||
|
||||
public static final String INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION = "index.routing.allocation.disable_new_allocation";
|
||||
public static final String INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION = "index.routing.allocation.disable_allocation";
|
||||
public static final String INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION = "index.routing.allocation.disable_replica_allocation";
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
@ -96,16 +100,20 @@ public class DisableAllocationDecider extends AllocationDecider {
|
||||
|
||||
@Override
|
||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
if (allocation.ignoreDisable()) {
|
||||
return Decision.YES;
|
||||
}
|
||||
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
|
||||
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
|
||||
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
|
||||
// on a special disable allocation flag
|
||||
return allocation.ignoreDisable() ? Decision.YES : disableNewAllocation ? Decision.NO : Decision.YES;
|
||||
return indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation) ? Decision.NO : Decision.YES;
|
||||
}
|
||||
if (disableAllocation) {
|
||||
return allocation.ignoreDisable() ? Decision.YES : Decision.NO;
|
||||
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, disableAllocation)) {
|
||||
return Decision.NO;
|
||||
}
|
||||
if (disableReplicaAllocation) {
|
||||
return shardRouting.primary() ? Decision.YES : allocation.ignoreDisable() ? Decision.YES : Decision.NO;
|
||||
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION, disableReplicaAllocation)) {
|
||||
return shardRouting.primary() ? Decision.YES : Decision.NO;
|
||||
}
|
||||
return Decision.YES;
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.settings;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||
import org.elasticsearch.cluster.settings.DynamicSettings;
|
||||
@ -52,6 +53,9 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
||||
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION);
|
||||
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
|
||||
indexDynamicSettings.addDynamicSetting(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
|
||||
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TYPE);
|
||||
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_BUFFER_SIZE, Validator.BYTES_SIZE);
|
||||
indexDynamicSettings.addDynamicSetting(FsTranslog.INDEX_TRANSLOG_FS_TRANSIENT_BUFFER_SIZE, Validator.BYTES_SIZE);
|
||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
@ -66,8 +67,8 @@ public class ClusterRerouteTests extends AbstractNodesTests {
|
||||
@Test
|
||||
public void rerouteWithCommands() throws Exception {
|
||||
Settings commonSettings = settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_new_allocation", true)
|
||||
.put("cluster.routing.allocation.disable_allocation", true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)
|
||||
.build();
|
||||
|
||||
startNode("node1", commonSettings);
|
||||
@ -129,8 +130,8 @@ public class ClusterRerouteTests extends AbstractNodesTests {
|
||||
@Test
|
||||
public void rerouteWithAllocateLocalGateway() throws Exception {
|
||||
Settings commonSettings = settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_new_allocation", true)
|
||||
.put("cluster.routing.allocation.disable_allocation", true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)
|
||||
.put("gateway.type", "local")
|
||||
.build();
|
||||
|
||||
|
5
src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java
5
src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java
@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
|
||||
import org.elasticsearch.action.admin.indices.status.ShardStatus;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
@ -375,7 +376,7 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
|
||||
|
||||
logger.info("--> shutting down the nodes");
|
||||
// Disable allocations while we are closing nodes
|
||||
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.disable_allocation", true)).execute().actionGet();
|
||||
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
|
||||
for (int i = 1; i < 5; i++) {
|
||||
closeNode("node" + i);
|
||||
}
|
||||
@ -394,7 +395,7 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
|
||||
|
||||
logger.info("--> shutting down the nodes");
|
||||
// Disable allocations while we are closing nodes
|
||||
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.disable_allocation", true)).execute().actionGet();
|
||||
client("node1").admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)).execute().actionGet();
|
||||
for (int i = 1; i < 5; i++) {
|
||||
closeNode("node" + i);
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCo
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
@ -106,8 +107,8 @@ public class AllocationCommandsTests {
|
||||
@Test
|
||||
public void allocateCommand() {
|
||||
AllocationService allocation = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_new_allocation", true)
|
||||
.put("cluster.routing.allocation.disable_allocation", true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)
|
||||
.build());
|
||||
|
||||
logger.info("--> building initial routing table");
|
||||
@ -187,8 +188,8 @@ public class AllocationCommandsTests {
|
||||
@Test
|
||||
public void cancelCommand() {
|
||||
AllocationService allocation = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_new_allocation", true)
|
||||
.put("cluster.routing.allocation.disable_allocation", true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)
|
||||
.build());
|
||||
|
||||
logger.info("--> building initial routing table");
|
||||
|
@ -23,8 +23,10 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
|
||||
@ -33,6 +35,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
|
||||
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
@ -46,10 +49,10 @@ public class DisableAllocationTests {
|
||||
private final ESLogger logger = Loggers.getLogger(DisableAllocationTests.class);
|
||||
|
||||
@Test
|
||||
public void testDisableAllocation() {
|
||||
public void testClusterDisableAllocation() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_new_allocation", true)
|
||||
.put("cluster.routing.allocation.disable_allocation", true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)
|
||||
.put(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)
|
||||
.build());
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
@ -64,7 +67,7 @@ public class DisableAllocationTests {
|
||||
|
||||
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
logger.info("--> adding two nodes on same rack and do rerouting");
|
||||
logger.info("--> adding two nodes and do rerouting");
|
||||
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
|
||||
.put(newNode("node1"))
|
||||
.put(newNode("node2"))
|
||||
@ -76,7 +79,7 @@ public class DisableAllocationTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableReplicaAllocation() {
|
||||
public void testClusterDisableReplicaAllocation() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
.put("cluster.routing.allocation.disable_replica_allocation", true)
|
||||
.build());
|
||||
@ -93,7 +96,7 @@ public class DisableAllocationTests {
|
||||
|
||||
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
logger.info("--> adding two nodes on same rack and do rerouting");
|
||||
logger.info("--> adding two nodes do rerouting");
|
||||
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
|
||||
.put(newNode("node1"))
|
||||
.put(newNode("node2"))
|
||||
@ -108,4 +111,42 @@ public class DisableAllocationTests {
|
||||
|
||||
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexDisableAllocation() {
|
||||
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||
.build());
|
||||
|
||||
MetaData metaData = newMetaDataBuilder()
|
||||
.put(newIndexMetaDataBuilder("disabled").settings(ImmutableSettings.builder().put(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true).put(DisableAllocationDecider.INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, true)).numberOfShards(1).numberOfReplicas(1))
|
||||
.put(newIndexMetaDataBuilder("enabled").numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
|
||||
RoutingTable routingTable = routingTable()
|
||||
.addAsNew(metaData.index("disabled"))
|
||||
.addAsNew(metaData.index("enabled"))
|
||||
.build();
|
||||
|
||||
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
logger.info("--> adding two nodes and do rerouting");
|
||||
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
|
||||
.put(newNode("node1"))
|
||||
.put(newNode("node2"))
|
||||
).build();
|
||||
routingTable = strategy.reroute(clusterState).routingTable();
|
||||
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
|
||||
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||
logger.info("--> start the shards (primaries)");
|
||||
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
|
||||
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
|
||||
logger.info("--> start the shards (replicas)");
|
||||
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
|
||||
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
|
||||
|
||||
logger.info("--> verify only enabled index has been routed");
|
||||
assertThat(clusterState.readOnlyRoutingNodes().shardsWithState("enabled", STARTED).size(), equalTo(2));
|
||||
assertThat(clusterState.readOnlyRoutingNodes().shardsWithState("disabled", STARTED).size(), equalTo(0));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.test.unit.cluster.settings;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.Node;
|
||||
@ -32,21 +33,21 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class ClusterSettingsTests {
|
||||
|
||||
|
||||
@Test
|
||||
public void clusterSettingsUpdateResponse() {
|
||||
Node node1 = NodeBuilder.nodeBuilder().node();
|
||||
Node node2 = NodeBuilder.nodeBuilder().node();
|
||||
|
||||
|
||||
node1.start();
|
||||
node2.start();
|
||||
|
||||
|
||||
Client client = node1.client();
|
||||
|
||||
String key1 = "indices.cache.filter.size";
|
||||
|
||||
String key1 = "indices.cache.filter.size";
|
||||
int value1 = 10;
|
||||
|
||||
String key2 = "cluster.routing.allocation.disable_allocation";
|
||||
String key2 = DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION;
|
||||
boolean value2 = true;
|
||||
|
||||
Settings transientSettings1 = ImmutableSettings.builder().put(key1, value1).build();
|
||||
@ -58,7 +59,7 @@ public class ClusterSettingsTests {
|
||||
.setPersistentSettings(persistentSettings1)
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
|
||||
assertThat(response1.getTransientSettings().get(key1), notNullValue());
|
||||
assertThat(response1.getTransientSettings().get(key2), nullValue());
|
||||
assertThat(response1.getPersistentSettings().get(key1), nullValue());
|
||||
@ -73,7 +74,7 @@ public class ClusterSettingsTests {
|
||||
.setPersistentSettings(persistentSettings2)
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
|
||||
assertThat(response2.getTransientSettings().get(key1), notNullValue());
|
||||
assertThat(response2.getTransientSettings().get(key2), notNullValue());
|
||||
assertThat(response2.getPersistentSettings().get(key1), nullValue());
|
||||
@ -88,7 +89,7 @@ public class ClusterSettingsTests {
|
||||
.setPersistentSettings(persistentSettings3)
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
|
||||
assertThat(response3.getTransientSettings().get(key1), nullValue());
|
||||
assertThat(response3.getTransientSettings().get(key2), nullValue());
|
||||
assertThat(response3.getPersistentSettings().get(key1), notNullValue());
|
||||
|
Loading…
x
Reference in New Issue
Block a user