Also use PriorityComparator in shard balancer

Today we try to allocate primaries first and then replicas
but don't take the index creation date and priority into account
as we do in the GatewayAlloactor.

Closes #13249
This commit is contained in:
Simon Willnauer 2015-09-01 21:27:28 +02:00
parent cf539a2158
commit f208aaa47c
4 changed files with 123 additions and 10 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter; import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
@ -36,6 +37,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.*; import java.util.*;
@ -559,6 +561,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* use the sorter to save some iterations. * use the sorter to save some iterations.
*/ */
final AllocationDeciders deciders = allocation.deciders(); final AllocationDeciders deciders = allocation.deciders();
final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation);
final Comparator<ShardRouting> comparator = new Comparator<ShardRouting>() { final Comparator<ShardRouting> comparator = new Comparator<ShardRouting>() {
@Override @Override
public int compare(ShardRouting o1, public int compare(ShardRouting o1,
@ -570,7 +573,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if ((indexCmp = o1.index().compareTo(o2.index())) == 0) { if ((indexCmp = o1.index().compareTo(o2.index())) == 0) {
return o1.getId() - o2.getId(); return o1.getId() - o2.getId();
} }
return indexCmp; // this comparator is more expensive than all the others up there
// that's why it's added last even though it could be easier to read
// if we'd apply it earlier. this comparator will only differentiate across
// indices all shards of the same index is treated equally.
final int secondary = secondaryComparator.compare(o1, o2);
return secondary == 0 ? indexCmp : secondary;
} }
}; };
/* /*

View File

@ -120,14 +120,7 @@ public class GatewayAllocator extends AbstractComponent {
boolean changed = false; boolean changed = false;
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
unassigned.sort(new PriorityComparator() { unassigned.sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
@Override
protected Settings getIndexSettings(String index) {
IndexMetaData indexMetaData = allocation.metaData().index(index);
return indexMetaData.getSettings();
}
}); // sort for priority ordering
changed |= primaryShardAllocator.allocateUnassigned(allocation); changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation); changed |= replicaShardAllocator.processExistingRecoveries(allocation);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.Comparator; import java.util.Comparator;
@ -33,7 +34,7 @@ import java.util.Comparator;
* here the newer indices matter more). If even that is the same, we compare the index name which is useful * here the newer indices matter more). If even that is the same, we compare the index name which is useful
* if the date is baked into the index name. ie logstash-2015.05.03. * if the date is baked into the index name. ie logstash-2015.05.03.
*/ */
abstract class PriorityComparator implements Comparator<ShardRouting> { public abstract class PriorityComparator implements Comparator<ShardRouting> {
@Override @Override
public final int compare(ShardRouting o1, ShardRouting o2) { public final int compare(ShardRouting o1, ShardRouting o2) {
@ -63,4 +64,17 @@ abstract class PriorityComparator implements Comparator<ShardRouting> {
} }
protected abstract Settings getIndexSettings(String index); protected abstract Settings getIndexSettings(String index);
/**
* Returns a PriorityComparator that uses the RoutingAllocation index metadata to access the index setting per index.
*/
public static PriorityComparator getAllocationComparator(final RoutingAllocation allocation) {
return new PriorityComparator() {
@Override
protected Settings getIndexSettings(String index) {
IndexMetaData indexMetaData = allocation.metaData().index(index);
return indexMetaData.getSettings();
}
};
}
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.test.ESAllocationTestCase;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class AllocationPriorityTests extends ESAllocationTestCase {
/**
* Tests that higher prioritized primaries and replicas are allocated first even on the balanced shard allocator
* See https://github.com/elastic/elasticsearch/issues/13249 for details
*/
public void testPrioritizedIndicesAllocatedFirst() {
AllocationService allocation = createAllocationService(settingsBuilder().
put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 1)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 1).build());
final String highPriorityName;
final String lowPriorityName;
final int priorityFirst;
final int prioritySecond;
if (randomBoolean()) {
highPriorityName = "first";
lowPriorityName = "second";
prioritySecond = 1;
priorityFirst = 100;
} else {
lowPriorityName = "first";
highPriorityName = "second";
prioritySecond = 100;
priorityFirst = 1;
}
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("first").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, priorityFirst)).numberOfShards(2).numberOfReplicas(1))
.put(IndexMetaData.builder("second").settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_PRIORITY, prioritySecond)).numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("first"))
.addAsNew(metaData.index("second"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
routingTable = allocation.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
routingTable = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).index());
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).index());
}
}