Remove ShardsAllocators and merge allocateUnassigned, moveShards and rebalance to improve performance

This commit is contained in:
Yannick Welsch 2016-03-07 11:04:24 +01:00
parent 42a6869bb1
commit f6ae9ec4f6
11 changed files with 66 additions and 237 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -36,13 +35,13 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -63,14 +62,17 @@ import java.util.stream.Collectors;
public class AllocationService extends AbstractComponent { public class AllocationService extends AbstractComponent {
private final AllocationDeciders allocationDeciders; private final AllocationDeciders allocationDeciders;
private final GatewayAllocator gatewayAllocator;
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService; private final ClusterInfoService clusterInfoService;
private final ShardsAllocators shardsAllocators;
@Inject @Inject
public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
super(settings); super(settings);
this.allocationDeciders = allocationDeciders; this.allocationDeciders = allocationDeciders;
this.shardsAllocators = shardsAllocators; this.gatewayAllocator = gatewayAllocator;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService; this.clusterInfoService = clusterInfoService;
} }
@ -92,7 +94,7 @@ public class AllocationService extends AbstractComponent {
if (!changed) { if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
} }
shardsAllocators.applyStartedShards(allocation); gatewayAllocator.applyStartedShards(allocation);
if (withReroute) { if (withReroute) {
reroute(allocation); reroute(allocation);
} }
@ -192,7 +194,7 @@ public class AllocationService extends AbstractComponent {
if (!changed) { if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
} }
shardsAllocators.applyFailedShards(allocation); gatewayAllocator.applyFailedShards(allocation);
reroute(allocation); reroute(allocation);
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
@ -306,14 +308,10 @@ public class AllocationService extends AbstractComponent {
if (allocation.routingNodes().unassigned().size() > 0) { if (allocation.routingNodes().unassigned().size() > 0) {
updateLeftDelayOfUnassignedShards(allocation, settings); updateLeftDelayOfUnassignedShards(allocation, settings);
changed |= shardsAllocators.allocateUnassigned(allocation); changed |= gatewayAllocator.allocateUnassigned(allocation);
} }
// move shards that no longer can be allocated changed |= shardsAllocator.allocate(allocation);
changed |= shardsAllocators.moveShards(allocation);
// rebalance
changed |= shardsAllocators.rebalance(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes()); assert RoutingNodes.assertShardStats(allocation.routingNodes());
return changed; return changed;
} }

View File

@ -103,27 +103,26 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
@Override @Override
public void applyStartedShards(StartedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ } public boolean allocate(RoutingAllocation allocation) {
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ }
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.allocateUnassigned(); boolean changed = false;
} if (allocation.routingNodes().unassigned().size() > 0) {
changed |= balancer.allocateUnassigned();
@Override }
public boolean rebalance(RoutingAllocation allocation) { changed |= balancer.moveShards();
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); if (allocation.hasPendingAsyncFetch() == false) {
return balancer.balance(); /*
} * see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
@Override * we might end up with unnecessary rebalance operations which can be super confusion/frustrating
public boolean moveShards(RoutingAllocation allocation) { * since once the fetches come back we might just move all the shards back again.
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); * Therefore we only do a rebalance if we have fetched all information.
return balancer.moveShards(); */
changed |= balancer.balance();
} else {
logger.debug("skipping rebalance due to in-flight shard/store fetches");
}
return changed;
} }
/** /**

View File

@ -19,11 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
/** /**
* <p> * <p>
@ -34,41 +30,13 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
*/ */
public interface ShardsAllocator { public interface ShardsAllocator {
/**
* Applies changes on started nodes based on the implemented algorithm. For example if a
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
* this allocator might apply some cleanups on the node that used to hold the shard.
* @param allocation all started {@link ShardRouting shards}
*/
void applyStartedShards(StartedRerouteAllocation allocation);
/**
* Applies changes on failed nodes based on the implemented algorithm.
* @param allocation all failed {@link ShardRouting shards}
*/
void applyFailedShards(FailedRerouteAllocation allocation);
/** /**
* Assign all unassigned shards to nodes * Assign all unassigned shards to nodes
* * Move started shards that can not be allocated to a node anymore
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocateUnassigned(RoutingAllocation allocation);
/**
* Rebalancing number of shards on all nodes * Rebalancing number of shards on all nodes
* *
* @param allocation current node allocation * @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code> * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/ */
boolean rebalance(RoutingAllocation allocation); boolean allocate(RoutingAllocation allocation);
/**
* Move started shards that can not be allocated to a node anymore
*
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean moveShards(RoutingAllocation allocation);
} }

View File

@ -1,100 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
/**
* The {@link ShardsAllocator} class offers methods for allocating shard within a cluster.
* These methods include moving shards and re-balancing the cluster. It also allows management
* of shards by their state.
*/
public class ShardsAllocators extends AbstractComponent implements ShardsAllocator {
private final GatewayAllocator gatewayAllocator;
private final ShardsAllocator allocator;
public ShardsAllocators(GatewayAllocator allocator) {
this(Settings.Builder.EMPTY_SETTINGS, allocator);
}
public ShardsAllocators(Settings settings, GatewayAllocator allocator) {
this(settings, allocator, new BalancedShardsAllocator(settings));
}
@Inject
public ShardsAllocators(Settings settings, GatewayAllocator gatewayAllocator, ShardsAllocator allocator) {
super(settings);
this.gatewayAllocator = gatewayAllocator;
this.allocator = allocator;
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {
gatewayAllocator.applyStartedShards(allocation);
allocator.applyStartedShards(allocation);
}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
gatewayAllocator.applyFailedShards(allocation);
allocator.applyFailedShards(allocation);
}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
changed |= gatewayAllocator.allocateUnassigned(allocation);
changed |= allocator.allocateUnassigned(allocation);
return changed;
}
protected long nanoTime() {
return System.nanoTime();
}
@Override
public boolean rebalance(RoutingAllocation allocation) {
if (allocation.hasPendingAsyncFetch() == false) {
/*
* see https://github.com/elastic/elasticsearch/issues/14387
* if we allow rebalance operations while we are still fetching shard store data
* we might end up with unnecessary rebalance operations which can be super confusion/frustrating
* since once the fetches come back we might just move all the shards back again.
* Therefore we only do a rebalance if we have fetched all information.
*/
return allocator.rebalance(allocation);
} else {
logger.debug("skipping rebalance due to in-flight shard/store fetches");
return false;
}
}
@Override
public boolean moveShards(RoutingAllocation allocation) {
return allocator.moveShards(allocation);
}
}

View File

@ -48,19 +48,7 @@ public class ClusterModuleTests extends ModuleTestCase {
static class FakeShardsAllocator implements ShardsAllocator { static class FakeShardsAllocator implements ShardsAllocator {
@Override @Override
public void applyStartedShards(StartedRerouteAllocation allocation) {} public boolean allocate(RoutingAllocation allocation) {
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
return false;
}
@Override
public boolean rebalance(RoutingAllocation allocation) {
return false;
}
@Override
public boolean moveShards(RoutingAllocation allocation) {
return false; return false;
} }
} }

View File

@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -311,29 +310,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
public void testNoRebalanceOnPrimaryOverload() { public void testNoRebalanceOnPrimaryOverload() {
Settings.Builder settings = settingsBuilder(); Settings.Builder settings = settingsBuilder();
AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(), AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(),
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), new ShardsAllocators(settings.build(), new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
@Override
public boolean rebalance(RoutingAllocation allocation) {
return false;
}
@Override
public boolean moveShards(RoutingAllocation allocation) {
return false;
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {
}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
}
/* /*
* // this allocator tries to rebuild this scenario where a rebalance is * // this allocator tries to rebuild this scenario where a rebalance is
* // triggered solely by the primary overload on node [1] where a shard * // triggered solely by the primary overload on node [1] where a shard
@ -354,9 +333,8 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
--------[test][2], node[3], [P], s[STARTED] --------[test][2], node[3], [P], s[STARTED]
--------[test][3], node[3], [P], s[STARTED] --------[test][3], node[3], [P], s[STARTED]
---- unassigned ---- unassigned
*/ */
@Override public boolean allocate(RoutingAllocation allocation) {
public boolean allocateUnassigned(RoutingAllocation allocation) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
boolean changed = !unassigned.isEmpty(); boolean changed = !unassigned.isEmpty();
ShardRouting[] drain = unassigned.drain(); ShardRouting[] drain = unassigned.drain();
@ -403,7 +381,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
} }
return changed; return changed;
} }
}), EmptyClusterInfoService.INSTANCE); }, EmptyClusterInfoService.INSTANCE);
MetaData.Builder metaDataBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1); IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1);

View File

@ -36,7 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -333,7 +333,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)}); AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)});
AllocationService strategy = new MockAllocationService(Settings.EMPTY, AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders, allocationDeciders,
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true); RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
state = ClusterState.builder(state).routingResult(result).build(); state = ClusterState.builder(state).routingResult(result).build();
@ -363,7 +363,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
new NodeVersionAllocationDecider(Settings.EMPTY)}); new NodeVersionAllocationDecider(Settings.EMPTY)});
AllocationService strategy = new MockAllocationService(Settings.EMPTY, AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders, allocationDeciders,
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true); RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
// Make sure that primary shards are only allocated on the new node // Make sure that primary shards are only allocated on the new node

View File

@ -30,7 +30,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -59,7 +59,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom()); RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom());
AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY, AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY), new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); randomAllocationDecider))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
int indices = scaledRandomIntBetween(1, 20); int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = MetaData.builder(); Builder metaBuilder = MetaData.builder();
int maxNumReplicas = 1; int maxNumReplicas = 1;

View File

@ -39,7 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
@ -65,10 +65,6 @@ import static org.hamcrest.Matchers.nullValue;
public class DiskThresholdDeciderTests extends ESAllocationTestCase { public class DiskThresholdDeciderTests extends ESAllocationTestCase {
private static ShardsAllocators makeShardsAllocators() {
return new ShardsAllocators(NoopGatewayAllocator.INSTANCE);
}
public void testDiskThreshold() { public void testDiskThreshold() {
Settings diskSettings = settingsBuilder() Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true)
@ -109,7 +105,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -194,7 +190,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -225,7 +221,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -305,7 +301,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
@ -362,7 +358,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -429,7 +425,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -460,7 +456,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
@ -569,7 +565,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -637,7 +633,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
@ -740,7 +736,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
MetaData metaData = MetaData.builder() MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -902,7 +898,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
// and therefor we will have sufficient disk space on node1. // and therefor we will have sufficient disk space on node1.
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
@ -1003,7 +999,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, makeShardsAllocators(), cis); .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis);
RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));

View File

@ -492,7 +492,7 @@ public class NodeJoinControllerTests extends ESTestCase {
static class NoopAllocationService extends AllocationService { static class NoopAllocationService extends AllocationService {
public NoopAllocationService(Settings settings) { public NoopAllocationService(Settings settings) {
super(settings, null, null, null); super(settings, null, null, null, null);
} }
@Override @Override

View File

@ -33,7 +33,8 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -79,19 +80,19 @@ public abstract class ESAllocationTestCase extends ESTestCase {
public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) { public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) {
return new MockAllocationService(settings, return new MockAllocationService(settings,
randomAllocationDeciders(settings, clusterSettings, random), randomAllocationDeciders(settings, clusterSettings, random),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
} }
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new MockAllocationService(settings, return new MockAllocationService(settings,
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), clusterInfoService);
} }
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) {
return new MockAllocationService(settings, return new MockAllocationService(settings,
randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE); gatewayAllocator, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
} }
@ -193,8 +194,9 @@ public abstract class ESAllocationTestCase extends ESTestCase {
private Long nanoTimeOverride = null; private Long nanoTimeOverride = null;
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
super(settings, allocationDeciders, shardsAllocators, clusterInfoService); ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
super(settings, allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService);
} }
public void setNanoTimeOverride(long nanoTime) { public void setNanoTimeOverride(long nanoTime) {