From 5052282ab619db729426aa85470fc181260d12fe Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 8 Sep 2011 14:29:08 +0300 Subject: [PATCH] Cluster / Index level allocation filtering, closes #1311. --- .../cluster/metadata/IndexMetaData.java | 26 +++ .../cluster/node/DiscoveryNodeFilters.java | 90 +++++++++ .../routing/allocation/AllocationService.java | 25 +++ .../allocator/EvenShardsCountAllocator.java | 27 +++ .../allocation/allocator/ShardsAllocator.java | 4 + .../allocator/ShardsAllocators.java | 6 + .../allocation/decider/AllocationDecider.java | 7 + .../decider/AllocationDeciders.java | 13 ++ .../decider/AllocationDecidersModule.java | 1 + .../decider/FilterAllocationDecider.java | 115 ++++++++++++ .../allocation/FilterRoutingTests.java | 171 ++++++++++++++++++ .../allocation/RoutingAllocationTests.java | 6 + 12 files changed, 491 insertions(+) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 9a017bb9849..1dc012b5a66 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -20,6 +20,8 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.cluster.node.DiscoveryNodeFilters; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; @@ -123,6 +125,9 @@ public class IndexMetaData { private transient final int totalNumberOfShards; + private final DiscoveryNodeFilters includeFilters; + private final DiscoveryNodeFilters excludeFilters; + private IndexMetaData(String index, State state, Settings settings, ImmutableMap mappings, ImmutableMap aliases) { Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]"); Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]"); @@ -133,6 +138,19 @@ public class IndexMetaData { this.totalNumberOfShards = numberOfShards() * (numberOfReplicas() + 1); this.aliases = aliases; + + ImmutableMap includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap(); + if (includeMap.isEmpty()) { + includeFilters = null; + } else { + includeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap); + } + ImmutableMap excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap(); + if (excludeMap.isEmpty()) { + excludeFilters = null; + } else { + excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap); + } } public String index() { @@ -203,6 +221,14 @@ public class IndexMetaData { return mappings.get(mappingType); } + @Nullable public DiscoveryNodeFilters includeFilters() { + return includeFilters; + } + + @Nullable public DiscoveryNodeFilters excludeFilters() { + return excludeFilters; + } + public static Builder newIndexMetaDataBuilder(String index) { return new Builder(index); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java new file mode 100644 index 00000000000..c41022271fb --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.node; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class DiscoveryNodeFilters { + + public static final DiscoveryNodeFilters NO_FILTERS = new DiscoveryNodeFilters(ImmutableMap.of()); + + public static DiscoveryNodeFilters buildFromSettings(String prefix, Settings settings) { + return buildFromKeyValue(settings.getByPrefix(prefix).getAsMap()); + } + + public static DiscoveryNodeFilters buildFromKeyValue(Map filters) { + Map bFilters = new HashMap(); + for (Map.Entry entry : filters.entrySet()) { + bFilters.put(entry.getKey(), Strings.splitStringByCommaToArray(entry.getValue())); + } + if (bFilters.isEmpty()) { + return NO_FILTERS; + } + return new DiscoveryNodeFilters(bFilters); + } + + private final Map filters; + + DiscoveryNodeFilters(Map filters) { + this.filters = filters; + } + + public boolean match(DiscoveryNode node) { + if (filters.isEmpty()) { + return true; + } + for (Map.Entry entry : filters.entrySet()) { + String attr = entry.getKey(); + String[] values = entry.getValue(); + if ("_ip".equals(attr)) { + if (!(node.address() instanceof InetSocketTransportAddress)) { + return false; + } + InetSocketTransportAddress inetAddress = (InetSocketTransportAddress) node.address(); + for (String value : values) { + if (!Regex.simpleMatch(value, inetAddress.address().getAddress().getHostAddress())) { + return false; + } + } + } else { + String nodeAttributeValue = node.attributes().get(attr); + if (nodeAttributeValue == null) { + return false; + } + for (String value : values) { + if (Regex.simpleMatch(value, nodeAttributeValue)) { + return true; + } + } + return false; + } + } + return true; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5b49815f2e6..dfe7c6f496f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -164,12 +164,37 @@ public class AllocationService extends AbstractComponent { changed |= electPrimaries(allocation.routingNodes()); } + // move shards that no longer can be allocated + changed |= moveShards(allocation); + // rebalance changed |= shardsAllocators.rebalance(allocation); return changed; } + private boolean moveShards(RoutingAllocation allocation) { + boolean changed = false; + for (RoutingNode routingNode : allocation.routingNodes()) { + for (MutableShardRouting shardRouting : routingNode) { + // we can only move started shards... + if (!shardRouting.started()) { + continue; + } + if (!allocation.deciders().canRemain(shardRouting, routingNode, allocation)) { + logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); + boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation); + if (!moved) { + logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } else { + changed = true; + } + } + } + } + return changed; + } + private boolean electPrimaries(RoutingNodes routingNodes) { boolean changed = false; for (MutableShardRouting shardEntry : routingNodes.unassigned()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java index a47d44cef0e..b9c4c7cae52 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java @@ -152,4 +152,31 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard } while (relocationPerformed); return changed; } + + @Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + assert shardRouting.started(); + boolean changed = false; + List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); + if (sortedNodesLeastToHigh.isEmpty()) { + return false; + } + + for (RoutingNode nodeToCheck : sortedNodesLeastToHigh) { + // check if its the node we are moving from, no sense to check on it + if (nodeToCheck.nodeId().equals(node.nodeId())) { + continue; + } + if (allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation).allocate()) { + nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(), + nodeToCheck.nodeId(), shardRouting.currentNodeId(), + shardRouting.primary(), INITIALIZING, shardRouting.version() + 1)); + + shardRouting.relocate(nodeToCheck.nodeId()); + changed = true; + break; + } + } + + return changed; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 25ef9d21a55..bd72d42fe5f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -34,4 +36,6 @@ public interface ShardsAllocator { boolean allocateUnassigned(RoutingAllocation allocation); boolean rebalance(RoutingAllocation allocation); + + boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 8dd4e00c572..b160838e1bb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -69,4 +71,8 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat @Override public boolean rebalance(RoutingAllocation allocation) { return allocator.rebalance(allocation); } + + @Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return allocator.move(shardRouting, node, allocation); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 7c53cd68fe5..22c3abdb1e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -63,4 +63,11 @@ public abstract class AllocationDecider extends AbstractComponent { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.YES; } + + /** + * Can the provided shard routing remain on the node? + */ + public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return true; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 32d799179f4..f157da76735 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -39,6 +39,7 @@ public class AllocationDeciders extends AllocationDecider { public AllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService) { this(settings, ImmutableSet.builder() .add(new SameShardAllocationDecider(settings)) + .add(new FilterAllocationDecider(settings, nodeSettingsService)) .add(new ReplicaAfterPrimaryActiveAllocationDecider(settings)) .add(new ThrottlingAllocationDecider(settings, nodeSettingsService)) .add(new RebalanceOnlyWhenActiveAllocationDecider(settings)) @@ -79,4 +80,16 @@ public class AllocationDeciders extends AllocationDecider { } return ret; } + + @Override public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { + return false; + } + for (AllocationDecider allocation1 : allocations) { + if (!allocation1.canRemain(shardRouting, node, allocation)) { + return false; + } + } + return true; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java index 8a06ea585a6..be318f9eca4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java @@ -41,6 +41,7 @@ public class AllocationDecidersModule extends AbstractModule { @Override protected void configure() { Multibinder allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class); allocationMultibinder.addBinding().to(SameShardAllocationDecider.class); + allocationMultibinder.addBinding().to(FilterAllocationDecider.class); allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveAllocationDecider.class); allocationMultibinder.addBinding().to(ThrottlingAllocationDecider.class); allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveAllocationDecider.class); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java new file mode 100644 index 00000000000..58fd548ece8 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodeFilters; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.settings.NodeSettingsService; + +/** + */ +public class FilterAllocationDecider extends AllocationDecider { + + static { + MetaData.addDynamicSettings( + "cluster.routing.allocation.include.*", + "cluster.routing.allocation.exclude.*" + ); + IndexMetaData.addDynamicSettings( + "index.routing.allocation.include.*", + "index.routing.allocation.exclude.*" + ); + } + + private volatile DiscoveryNodeFilters clusterIncludeFilters; + private volatile DiscoveryNodeFilters clusterExcludeFilters; + + @Inject public FilterAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) { + super(settings); + ImmutableMap includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap(); + if (includeMap.isEmpty()) { + clusterIncludeFilters = null; + } else { + clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap); + } + ImmutableMap excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap(); + if (excludeMap.isEmpty()) { + clusterExcludeFilters = null; + } else { + clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap); + } + nodeSettingsService.addListener(new ApplySettings()); + } + + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node, allocation) ? Decision.NO : Decision.YES; + } + + @Override public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return !shouldFilter(shardRouting, node, allocation); + } + + private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (clusterIncludeFilters != null) { + if (!clusterIncludeFilters.match(node.node())) { + return true; + } + } + if (clusterExcludeFilters != null) { + if (clusterExcludeFilters.match(node.node())) { + return true; + } + } + + IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); + if (indexMd.includeFilters() != null) { + if (!indexMd.includeFilters().match(node.node())) { + return true; + } + } + if (indexMd.excludeFilters() != null) { + if (indexMd.excludeFilters().match(node.node())) { + return true; + } + } + + return false; + } + + class ApplySettings implements NodeSettingsService.Listener { + @Override public void onRefreshSettings(Settings settings) { + ImmutableMap includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap(); + if (!includeMap.isEmpty()) { + clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap); + } + ImmutableMap excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap(); + if (!excludeMap.isEmpty()) { + clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap); + } + } + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java new file mode 100644 index 00000000000..c83a446fe56 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FilterRoutingTests.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.hamcrest.Matchers; +import org.testng.annotations.Test; + +import java.util.List; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + */ +@Test +public class FilterRoutingTests { + + private final ESLogger logger = Loggers.getLogger(FilterRoutingTests.class); + + @Test public void testClusterFilters() { + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.include.tag1", "value1,value2") + .put("cluster.routing.allocation.exclude.tag1", "value3,value4") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("tag1", "value1"))) + .put(newNode("node2", ImmutableMap.of("tag1", "value2"))) + .put(newNode("node3", ImmutableMap.of("tag1", "value3"))) + .put(newNode("node4", ImmutableMap.of("tag1", "value4"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> make sure shards are only allocated on tag1 with value1 and value2"); + List startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(4)); + for (MutableShardRouting startedShard : startedShards) { + assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node2"))); + } + } + + @Test public void testIndexFilters() { + AllocationService strategy = new AllocationService(settingsBuilder() + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").settings(settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include.tag1", "value1,value2") + .put("index.routing.allocation.exclude.tag1", "value3,value4") + .build())) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode("node1", ImmutableMap.of("tag1", "value1"))) + .put(newNode("node2", ImmutableMap.of("tag1", "value2"))) + .put(newNode("node3", ImmutableMap.of("tag1", "value3"))) + .put(newNode("node4", ImmutableMap.of("tag1", "value4"))) + ).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + logger.info("--> start the shards (primaries)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> start the shards (replicas)"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("--> make sure shards are only allocated on tag1 with value1 and value2"); + List startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(4)); + for (MutableShardRouting startedShard : startedShards) { + assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node2"))); + } + + logger.info("--> switch between value2 and value4, shards should be relocating"); + + metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").settings(settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include.tag1", "value1,value4") + .put("index.routing.allocation.exclude.tag1", "value2,value3") + .build())) + .build(); + clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(2)); + + logger.info("--> finish relocation"); + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED); + assertThat(startedShards.size(), equalTo(4)); + for (MutableShardRouting startedShard : startedShards) { + assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node4"))); + } + } +} diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java index 0735d94b679..314df22ba72 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java @@ -22,9 +22,15 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.DummyTransportAddress; +import java.util.Map; + public class RoutingAllocationTests { public static DiscoveryNode newNode(String nodeId) { return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE); } + + public static DiscoveryNode newNode(String nodeId, Map attributes) { + return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes); + } } \ No newline at end of file