From f208aaa47c277d323d52b79241cfb8eef64b5e90 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 1 Sep 2015 21:27:28 +0200 Subject: [PATCH] Also use PriorityComparator in shard balancer Today we try to allocate primaries first and then replicas but don't take the index creation date and priority into account as we do in the GatewayAlloactor. Closes #13249 --- .../allocator/BalancedShardsAllocator.java | 10 +- .../gateway/GatewayAllocator.java | 9 +- .../gateway/PriorityComparator.java | 16 ++- .../allocation/AllocationPriorityTests.java | 98 +++++++++++++++++++ 4 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 21f0c7a13e2..0cb21055974 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; @@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.node.settings.NodeSettingsService; import java.util.*; @@ -559,6 +561,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * use the sorter to save some iterations. */ final AllocationDeciders deciders = allocation.deciders(); + final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); final Comparator comparator = new Comparator() { @Override public int compare(ShardRouting o1, @@ -570,7 +573,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if ((indexCmp = o1.index().compareTo(o2.index())) == 0) { return o1.getId() - o2.getId(); } - return indexCmp; + // this comparator is more expensive than all the others up there + // that's why it's added last even though it could be easier to read + // if we'd apply it earlier. this comparator will only differentiate across + // indices all shards of the same index is treated equally. + final int secondary = secondaryComparator.compare(o1, o2); + return secondary == 0 ? indexCmp : secondary; } }; /* diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 449bd67e26c..50a77e197f0 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -120,14 +120,7 @@ public class GatewayAllocator extends AbstractComponent { boolean changed = false; RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); - unassigned.sort(new PriorityComparator() { - - @Override - protected Settings getIndexSettings(String index) { - IndexMetaData indexMetaData = allocation.metaData().index(index); - return indexMetaData.getSettings(); - } - }); // sort for priority ordering + unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering changed |= primaryShardAllocator.allocateUnassigned(allocation); changed |= replicaShardAllocator.processExistingRecoveries(allocation); diff --git a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java index 2176a70c74b..4f70bf41488 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PriorityComparator.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.Settings; import java.util.Comparator; @@ -33,7 +34,7 @@ import java.util.Comparator; * here the newer indices matter more). If even that is the same, we compare the index name which is useful * if the date is baked into the index name. ie logstash-2015.05.03. */ -abstract class PriorityComparator implements Comparator { +public abstract class PriorityComparator implements Comparator { @Override public final int compare(ShardRouting o1, ShardRouting o2) { @@ -63,4 +64,17 @@ abstract class PriorityComparator implements Comparator { } protected abstract Settings getIndexSettings(String index); + + /** + * Returns a PriorityComparator that uses the RoutingAllocation index metadata to access the index setting per index. + */ + public static PriorityComparator getAllocationComparator(final RoutingAllocation allocation) { + return new PriorityComparator() { + @Override + protected Settings getIndexSettings(String index) { + IndexMetaData indexMetaData = allocation.metaData().index(index); + return indexMetaData.getSettings(); + } + }; + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java new file mode 100644 index 00000000000..6fbc92a66ad --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.test.ESAllocationTestCase; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + +public class AllocationPriorityTests extends ESAllocationTestCase { + + /** + * Tests that higher prioritized primaries and replicas are allocated first even on the balanced shard allocator + * See https://github.com/elastic/elasticsearch/issues/13249 for details + */ + public void testPrioritizedIndicesAllocatedFirst() { + AllocationService allocation = createAllocationService(settingsBuilder(). + put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 1).build()); + final String highPriorityName; + final String lowPriorityName; + final int priorityFirst; + final int prioritySecond; + if (randomBoolean()) { + highPriorityName = "first"; + lowPriorityName = "second"; + prioritySecond = 1; + priorityFirst = 100; + } else { + lowPriorityName = "first"; + highPriorityName = "second"; + prioritySecond = 100; + priorityFirst = 1; + } + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("first").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, priorityFirst)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetaData.builder("second").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, prioritySecond)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("first")) + .addAsNew(metaData.index("second")) + .build(); + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); + clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + + routingTable = allocation.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index()); + + } +}