diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 21f0c7a13e2..0cb21055974 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; @@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.node.settings.NodeSettingsService; import java.util.*; @@ -559,6 +561,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * use the sorter to save some iterations. */ final AllocationDeciders deciders = allocation.deciders(); + final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); final Comparator comparator = new Comparator() { @Override public int compare(ShardRouting o1, @@ -570,7 +573,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if ((indexCmp = o1.index().compareTo(o2.index())) == 0) { return o1.getId() - o2.getId(); } - return indexCmp; + // this comparator is more expensive than all the others up there + // that's why it's added last even though it could be easier to read + // if we'd apply it earlier. this comparator will only differentiate across + // indices all shards of the same index is treated equally. + final int secondary = secondaryComparator.compare(o1, o2); + return secondary == 0 ? indexCmp : secondary; } }; /* diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 449bd67e26c..50a77e197f0 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -120,14 +120,7 @@ public class GatewayAllocator extends AbstractComponent { boolean changed = false; RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); - unassigned.sort(new PriorityComparator() { - - @Override - protected Settings getIndexSettings(String index) { - IndexMetaData indexMetaData = allocation.metaData().index(index); - return indexMetaData.getSettings(); - } - }); // sort for priority ordering + unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering changed |= primaryShardAllocator.allocateUnassigned(allocation); changed |= replicaShardAllocator.processExistingRecoveries(allocation); diff --git a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java index 2176a70c74b..4f70bf41488 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.Settings; import java.util.Comparator; @@ -33,7 +34,7 @@ import java.util.Comparator; * here the newer indices matter more). If even that is the same, we compare the index name which is useful * if the date is baked into the index name. ie logstash-2015.05.03. */ -abstract class PriorityComparator implements Comparator { +public abstract class PriorityComparator implements Comparator { @Override public final int compare(ShardRouting o1, ShardRouting o2) { @@ -63,4 +64,17 @@ abstract class PriorityComparator implements Comparator { } protected abstract Settings getIndexSettings(String index); + + /** + * Returns a PriorityComparator that uses the RoutingAllocation index metadata to access the index setting per index. + */ + public static PriorityComparator getAllocationComparator(final RoutingAllocation allocation) { + return new PriorityComparator() { + @Override + protected Settings getIndexSettings(String index) { + IndexMetaData indexMetaData = allocation.metaData().index(index); + return indexMetaData.getSettings(); + } + }; + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java new file mode 100644 index 00000000000..6fbc92a66ad --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.test.ESAllocationTestCase; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + +public class AllocationPriorityTests extends ESAllocationTestCase { + + /** + * Tests that higher prioritized primaries and replicas are allocated first even on the balanced shard allocator + * See https://github.com/elastic/elasticsearch/issues/13249 for details + */ + public void testPrioritizedIndicesAllocatedFirst() { + AllocationService allocation = createAllocationService(settingsBuilder(). + put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 1).build()); + final String highPriorityName; + final String lowPriorityName; + final int priorityFirst; + final int prioritySecond; + if (randomBoolean()) { + highPriorityName = "first"; + lowPriorityName = "second"; + prioritySecond = 1; + priorityFirst = 100; + } else { + lowPriorityName = "first"; + highPriorityName = "second"; + prioritySecond = 100; + priorityFirst = 1; + } + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("first").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, priorityFirst)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetaData.builder("second").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, prioritySecond)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("first")) + .addAsNew(metaData.index("second")) + .build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + routingTable = allocation.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + } +}