Cluster / Index level allocation filtering, closes #1311.

This commit is contained in:
Shay Banon 2011-09-08 14:29:08 +03:00
parent 4180a7f73a
commit 5052282ab6
12 changed files with 491 additions and 0 deletions

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
@ -123,6 +125,9 @@ public class IndexMetaData {
private transient final int totalNumberOfShards;
private final DiscoveryNodeFilters includeFilters;
private final DiscoveryNodeFilters excludeFilters;
private IndexMetaData(String index, State state, Settings settings, ImmutableMap<String, MappingMetaData> mappings, ImmutableMap<String, AliasMetaData> aliases) {
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]");
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]");
@ -133,6 +138,19 @@ public class IndexMetaData {
this.totalNumberOfShards = numberOfShards() * (numberOfReplicas() + 1);
this.aliases = aliases;
ImmutableMap<String, String> includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap();
if (includeMap.isEmpty()) {
includeFilters = null;
} else {
includeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
}
ImmutableMap<String, String> excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap();
if (excludeMap.isEmpty()) {
excludeFilters = null;
} else {
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
}
}
public String index() {
@ -203,6 +221,14 @@ public class IndexMetaData {
return mappings.get(mappingType);
}
@Nullable public DiscoveryNodeFilters includeFilters() {
return includeFilters;
}
@Nullable public DiscoveryNodeFilters excludeFilters() {
return excludeFilters;
}
public static Builder newIndexMetaDataBuilder(String index) {
return new Builder(index);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class DiscoveryNodeFilters {
public static final DiscoveryNodeFilters NO_FILTERS = new DiscoveryNodeFilters(ImmutableMap.<String, String[]>of());
public static DiscoveryNodeFilters buildFromSettings(String prefix, Settings settings) {
return buildFromKeyValue(settings.getByPrefix(prefix).getAsMap());
}
public static DiscoveryNodeFilters buildFromKeyValue(Map<String, String> filters) {
Map<String, String[]> bFilters = new HashMap<String, String[]>();
for (Map.Entry<String, String> entry : filters.entrySet()) {
bFilters.put(entry.getKey(), Strings.splitStringByCommaToArray(entry.getValue()));
}
if (bFilters.isEmpty()) {
return NO_FILTERS;
}
return new DiscoveryNodeFilters(bFilters);
}
private final Map<String, String[]> filters;
DiscoveryNodeFilters(Map<String, String[]> filters) {
this.filters = filters;
}
public boolean match(DiscoveryNode node) {
if (filters.isEmpty()) {
return true;
}
for (Map.Entry<String, String[]> entry : filters.entrySet()) {
String attr = entry.getKey();
String[] values = entry.getValue();
if ("_ip".equals(attr)) {
if (!(node.address() instanceof InetSocketTransportAddress)) {
return false;
}
InetSocketTransportAddress inetAddress = (InetSocketTransportAddress) node.address();
for (String value : values) {
if (!Regex.simpleMatch(value, inetAddress.address().getAddress().getHostAddress())) {
return false;
}
}
} else {
String nodeAttributeValue = node.attributes().get(attr);
if (nodeAttributeValue == null) {
return false;
}
for (String value : values) {
if (Regex.simpleMatch(value, nodeAttributeValue)) {
return true;
}
}
return false;
}
}
return true;
}
}

View File

@ -164,12 +164,37 @@ public class AllocationService extends AbstractComponent {
changed |= electPrimaries(allocation.routingNodes());
}
// move shards that no longer can be allocated
changed |= moveShards(allocation);
// rebalance
changed |= shardsAllocators.rebalance(allocation);
return changed;
}
private boolean moveShards(RoutingAllocation allocation) {
boolean changed = false;
for (RoutingNode routingNode : allocation.routingNodes()) {
for (MutableShardRouting shardRouting : routingNode) {
// we can only move started shards...
if (!shardRouting.started()) {
continue;
}
if (!allocation.deciders().canRemain(shardRouting, routingNode, allocation)) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
if (!moved) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} else {
changed = true;
}
}
}
}
return changed;
}
private boolean electPrimaries(RoutingNodes routingNodes) {
boolean changed = false;
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {

View File

@ -152,4 +152,31 @@ public class EvenShardsCountAllocator extends AbstractComponent implements Shard
} while (relocationPerformed);
return changed;
}
@Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
assert shardRouting.started();
boolean changed = false;
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
if (sortedNodesLeastToHigh.isEmpty()) {
return false;
}
for (RoutingNode nodeToCheck : sortedNodesLeastToHigh) {
// check if its the node we are moving from, no sense to check on it
if (nodeToCheck.nodeId().equals(node.nodeId())) {
continue;
}
if (allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation).allocate()) {
nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
nodeToCheck.nodeId(), shardRouting.currentNodeId(),
shardRouting.primary(), INITIALIZING, shardRouting.version() + 1));
shardRouting.relocate(nodeToCheck.nodeId());
changed = true;
break;
}
}
return changed;
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -34,4 +36,6 @@ public interface ShardsAllocator {
boolean allocateUnassigned(RoutingAllocation allocation);
boolean rebalance(RoutingAllocation allocation);
boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -69,4 +71,8 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
@Override public boolean rebalance(RoutingAllocation allocation) {
return allocator.rebalance(allocation);
}
@Override public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return allocator.move(shardRouting, node, allocation);
}
}

View File

@ -63,4 +63,11 @@ public abstract class AllocationDecider extends AbstractComponent {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}
/**
* Can the provided shard routing remain on the node?
*/
public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return true;
}
}

View File

@ -39,6 +39,7 @@ public class AllocationDeciders extends AllocationDecider {
public AllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService) {
this(settings, ImmutableSet.<AllocationDecider>builder()
.add(new SameShardAllocationDecider(settings))
.add(new FilterAllocationDecider(settings, nodeSettingsService))
.add(new ReplicaAfterPrimaryActiveAllocationDecider(settings))
.add(new ThrottlingAllocationDecider(settings, nodeSettingsService))
.add(new RebalanceOnlyWhenActiveAllocationDecider(settings))
@ -79,4 +80,16 @@ public class AllocationDeciders extends AllocationDecider {
}
return ret;
}
@Override public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return false;
}
for (AllocationDecider allocation1 : allocations) {
if (!allocation1.canRemain(shardRouting, node, allocation)) {
return false;
}
}
return true;
}
}

View File

@ -41,6 +41,7 @@ public class AllocationDecidersModule extends AbstractModule {
@Override protected void configure() {
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
allocationMultibinder.addBinding().to(SameShardAllocationDecider.class);
allocationMultibinder.addBinding().to(FilterAllocationDecider.class);
allocationMultibinder.addBinding().to(ReplicaAfterPrimaryActiveAllocationDecider.class);
allocationMultibinder.addBinding().to(ThrottlingAllocationDecider.class);
allocationMultibinder.addBinding().to(RebalanceOnlyWhenActiveAllocationDecider.class);

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
/**
*/
public class FilterAllocationDecider extends AllocationDecider {
static {
MetaData.addDynamicSettings(
"cluster.routing.allocation.include.*",
"cluster.routing.allocation.exclude.*"
);
IndexMetaData.addDynamicSettings(
"index.routing.allocation.include.*",
"index.routing.allocation.exclude.*"
);
}
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;
@Inject public FilterAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
ImmutableMap<String, String> includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap();
if (includeMap.isEmpty()) {
clusterIncludeFilters = null;
} else {
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
}
ImmutableMap<String, String> excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap();
if (excludeMap.isEmpty()) {
clusterExcludeFilters = null;
} else {
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
}
nodeSettingsService.addListener(new ApplySettings());
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node, allocation) ? Decision.NO : Decision.YES;
}
@Override public boolean canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return !shouldFilter(shardRouting, node, allocation);
}
private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (clusterIncludeFilters != null) {
if (!clusterIncludeFilters.match(node.node())) {
return true;
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
return true;
}
}
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
if (indexMd.includeFilters() != null) {
if (!indexMd.includeFilters().match(node.node())) {
return true;
}
}
if (indexMd.excludeFilters() != null) {
if (indexMd.excludeFilters().match(node.node())) {
return true;
}
}
return false;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
ImmutableMap<String, String> includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap();
if (!includeMap.isEmpty()) {
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
}
ImmutableMap<String, String> excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap();
if (!excludeMap.isEmpty()) {
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
}
}
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.hamcrest.Matchers;
import org.testng.annotations.Test;
import java.util.List;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.cluster.routing.RoutingBuilders.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
*/
@Test
public class FilterRoutingTests {
private final ESLogger logger = Loggers.getLogger(FilterRoutingTests.class);
@Test public void testClusterFilters() {
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.include.tag1", "value1,value2")
.put("cluster.routing.allocation.exclude.tag1", "value3,value4")
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1", ImmutableMap.of("tag1", "value1")))
.put(newNode("node2", ImmutableMap.of("tag1", "value2")))
.put(newNode("node3", ImmutableMap.of("tag1", "value3")))
.put(newNode("node4", ImmutableMap.of("tag1", "value4")))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> make sure shards are only allocated on tag1 with value1 and value2");
List<MutableShardRouting> startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED);
assertThat(startedShards.size(), equalTo(4));
for (MutableShardRouting startedShard : startedShards) {
assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node2")));
}
}
@Test public void testIndexFilters() {
AllocationService strategy = new AllocationService(settingsBuilder()
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").settings(settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.put("index.routing.allocation.include.tag1", "value1,value2")
.put("index.routing.allocation.exclude.tag1", "value3,value4")
.build()))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder()
.put(newNode("node1", ImmutableMap.of("tag1", "value1")))
.put(newNode("node2", ImmutableMap.of("tag1", "value2")))
.put(newNode("node3", ImmutableMap.of("tag1", "value3")))
.put(newNode("node4", ImmutableMap.of("tag1", "value4")))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
logger.info("--> start the shards (primaries)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> start the shards (replicas)");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("--> make sure shards are only allocated on tag1 with value1 and value2");
List<MutableShardRouting> startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED);
assertThat(startedShards.size(), equalTo(4));
for (MutableShardRouting startedShard : startedShards) {
assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node2")));
}
logger.info("--> switch between value2 and value4, shards should be relocating");
metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").settings(settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.put("index.routing.allocation.include.tag1", "value1,value4")
.put("index.routing.allocation.exclude.tag1", "value2,value3")
.build()))
.build();
clusterState = newClusterStateBuilder().state(clusterState).metaData(metaData).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(2));
logger.info("--> finish relocation");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
startedShards = clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED);
assertThat(startedShards.size(), equalTo(4));
for (MutableShardRouting startedShard : startedShards) {
assertThat(startedShard.currentNodeId(), Matchers.anyOf(equalTo("node1"), equalTo("node4")));
}
}
}

View File

@ -22,9 +22,15 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.DummyTransportAddress;
import java.util.Map;
public class RoutingAllocationTests {
public static DiscoveryNode newNode(String nodeId) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE);
}
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes);
}
}