From f6ae9ec4f608eeffbb4ecbcea98c0916ee523d12 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 Mar 2016 11:04:24 +0100 Subject: [PATCH] Remove ShardsAllocators and merge allocateUnassigned, moveShards and rebalance to improve performance --- .../routing/allocation/AllocationService.java | 26 +++-- .../allocator/BalancedShardsAllocator.java | 39 ++++--- .../allocation/allocator/ShardsAllocator.java | 36 +------ .../allocator/ShardsAllocators.java | 100 ------------------ .../cluster/ClusterModuleTests.java | 14 +-- .../allocation/BalanceConfigurationTests.java | 30 +----- .../NodeVersionAllocationDeciderTests.java | 6 +- .../RandomAllocationDeciderTests.java | 4 +- .../decider/DiskThresholdDeciderTests.java | 30 +++--- .../zen/NodeJoinControllerTests.java | 2 +- .../test/ESAllocationTestCase.java | 16 +-- 11 files changed, 66 insertions(+), 237 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index eeeb6e3389c..5c383bcae83 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; -import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -36,13 +35,13 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayAllocator; import java.util.ArrayList; import java.util.Collections; @@ -63,14 +62,17 @@ import java.util.stream.Collectors; public class AllocationService extends AbstractComponent { private final AllocationDeciders allocationDeciders; + private final GatewayAllocator gatewayAllocator; + private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; - private final ShardsAllocators shardsAllocators; @Inject - public AllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { + public AllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator, + ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) { super(settings); this.allocationDeciders = allocationDeciders; - this.shardsAllocators = shardsAllocators; + this.gatewayAllocator = gatewayAllocator; + this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; } @@ -92,7 +94,7 @@ public class AllocationService extends AbstractComponent { if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - shardsAllocators.applyStartedShards(allocation); + gatewayAllocator.applyStartedShards(allocation); if (withReroute) { reroute(allocation); } @@ -192,7 +194,7 @@ public class AllocationService extends AbstractComponent { if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - shardsAllocators.applyFailedShards(allocation); + gatewayAllocator.applyFailedShards(allocation); reroute(allocation); final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); @@ -306,14 +308,10 @@ public class AllocationService extends AbstractComponent { if (allocation.routingNodes().unassigned().size() > 0) { updateLeftDelayOfUnassignedShards(allocation, settings); - changed |= shardsAllocators.allocateUnassigned(allocation); + changed |= gatewayAllocator.allocateUnassigned(allocation); } - // move shards that no longer can be allocated - changed |= shardsAllocators.moveShards(allocation); - - // rebalance - changed |= shardsAllocators.rebalance(allocation); + changed |= shardsAllocator.allocate(allocation); assert RoutingNodes.assertShardStats(allocation.routingNodes()); return changed; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0c40b26ca67..3e5b0847b0a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -103,27 +103,26 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } @Override - public void applyStartedShards(StartedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ } - - @Override - public void applyFailedShards(FailedRerouteAllocation allocation) { /* ONLY FOR GATEWAYS */ } - - @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { + public boolean allocate(RoutingAllocation allocation) { final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - return balancer.allocateUnassigned(); - } - - @Override - public boolean rebalance(RoutingAllocation allocation) { - final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - return balancer.balance(); - } - - @Override - public boolean moveShards(RoutingAllocation allocation) { - final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - return balancer.moveShards(); + boolean changed = false; + if (allocation.routingNodes().unassigned().size() > 0) { + changed |= balancer.allocateUnassigned(); + } + changed |= balancer.moveShards(); + if (allocation.hasPendingAsyncFetch() == false) { + /* + * see https://github.com/elastic/elasticsearch/issues/14387 + * if we allow rebalance operations while we are still fetching shard store data + * we might end up with unnecessary rebalance operations which can be super confusion/frustrating + * since once the fetches come back we might just move all the shards back again. + * Therefore we only do a rebalance if we have fetched all information. + */ + changed |= balancer.balance(); + } else { + logger.debug("skipping rebalance due to in-flight shard/store fetches"); + } + return changed; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 4d9c05527d3..2656e2e3167 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -19,11 +19,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; /** *

@@ -34,41 +30,13 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; */ public interface ShardsAllocator { - /** - * Applies changes on started nodes based on the implemented algorithm. For example if a - * shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING} - * this allocator might apply some cleanups on the node that used to hold the shard. - * @param allocation all started {@link ShardRouting shards} - */ - void applyStartedShards(StartedRerouteAllocation allocation); - - /** - * Applies changes on failed nodes based on the implemented algorithm. - * @param allocation all failed {@link ShardRouting shards} - */ - void applyFailedShards(FailedRerouteAllocation allocation); - /** * Assign all unassigned shards to nodes - * - * @param allocation current node allocation - * @return true if the allocation has changed, otherwise false - */ - boolean allocateUnassigned(RoutingAllocation allocation); - - /** + * Move started shards that can not be allocated to a node anymore * Rebalancing number of shards on all nodes * * @param allocation current node allocation * @return true if the allocation has changed, otherwise false */ - boolean rebalance(RoutingAllocation allocation); - - /** - * Move started shards that can not be allocated to a node anymore - * - * @param allocation current node allocation - * @return true if the allocation has changed, otherwise false - */ - boolean moveShards(RoutingAllocation allocation); + boolean allocate(RoutingAllocation allocation); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java deleted file mode 100644 index f3eb1ebbf14..00000000000 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.routing.allocation.allocator; - -import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.gateway.GatewayAllocator; - -/** - * The {@link ShardsAllocator} class offers methods for allocating shard within a cluster. - * These methods include moving shards and re-balancing the cluster. It also allows management - * of shards by their state. - */ -public class ShardsAllocators extends AbstractComponent implements ShardsAllocator { - - private final GatewayAllocator gatewayAllocator; - private final ShardsAllocator allocator; - - public ShardsAllocators(GatewayAllocator allocator) { - this(Settings.Builder.EMPTY_SETTINGS, allocator); - } - - public ShardsAllocators(Settings settings, GatewayAllocator allocator) { - this(settings, allocator, new BalancedShardsAllocator(settings)); - } - - @Inject - public ShardsAllocators(Settings settings, GatewayAllocator gatewayAllocator, ShardsAllocator allocator) { - super(settings); - this.gatewayAllocator = gatewayAllocator; - this.allocator = allocator; - } - - @Override - public void applyStartedShards(StartedRerouteAllocation allocation) { - gatewayAllocator.applyStartedShards(allocation); - allocator.applyStartedShards(allocation); - } - - @Override - public void applyFailedShards(FailedRerouteAllocation allocation) { - gatewayAllocator.applyFailedShards(allocation); - allocator.applyFailedShards(allocation); - } - - @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { - boolean changed = false; - changed |= gatewayAllocator.allocateUnassigned(allocation); - changed |= allocator.allocateUnassigned(allocation); - return changed; - } - - protected long nanoTime() { - return System.nanoTime(); - } - - @Override - public boolean rebalance(RoutingAllocation allocation) { - if (allocation.hasPendingAsyncFetch() == false) { - /* - * see https://github.com/elastic/elasticsearch/issues/14387 - * if we allow rebalance operations while we are still fetching shard store data - * we might end up with unnecessary rebalance operations which can be super confusion/frustrating - * since once the fetches come back we might just move all the shards back again. - * Therefore we only do a rebalance if we have fetched all information. - */ - return allocator.rebalance(allocation); - } else { - logger.debug("skipping rebalance due to in-flight shard/store fetches"); - return false; - } - } - - @Override - public boolean moveShards(RoutingAllocation allocation) { - return allocator.moveShards(allocation); - } -} diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 24635a980a7..42f0e3a0601 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -48,19 +48,7 @@ public class ClusterModuleTests extends ModuleTestCase { static class FakeShardsAllocator implements ShardsAllocator { @Override - public void applyStartedShards(StartedRerouteAllocation allocation) {} - @Override - public void applyFailedShards(FailedRerouteAllocation allocation) {} - @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { - return false; - } - @Override - public boolean rebalance(RoutingAllocation allocation) { - return false; - } - @Override - public boolean moveShards(RoutingAllocation allocation) { + public boolean allocate(RoutingAllocation allocation) { return false; } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 707129578c9..56a66b52d6f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -311,29 +310,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { public void testNoRebalanceOnPrimaryOverload() { Settings.Builder settings = settingsBuilder(); AllocationService strategy = new AllocationService(settings.build(), randomAllocationDeciders(settings.build(), - new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), new ShardsAllocators(settings.build(), + new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { - @Override - public boolean rebalance(RoutingAllocation allocation) { - return false; - } - - @Override - public boolean moveShards(RoutingAllocation allocation) { - return false; - } - - @Override - public void applyStartedShards(StartedRerouteAllocation allocation) { - - - } - - @Override - public void applyFailedShards(FailedRerouteAllocation allocation) { - } - /* * // this allocator tries to rebuild this scenario where a rebalance is * // triggered solely by the primary overload on node [1] where a shard @@ -354,9 +333,8 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { --------[test][2], node[3], [P], s[STARTED] --------[test][3], node[3], [P], s[STARTED] ---- unassigned - */ - @Override - public boolean allocateUnassigned(RoutingAllocation allocation) { + */ + public boolean allocate(RoutingAllocation allocation) { RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); boolean changed = !unassigned.isEmpty(); ShardRouting[] drain = unassigned.drain(); @@ -403,7 +381,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { } return changed; } - }), EmptyClusterInfoService.INSTANCE); + }, EmptyClusterInfoService.INSTANCE); MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); IndexMetaData.Builder indexMeta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 4e5be0f26b7..813bee8f80e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -36,7 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -333,7 +333,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)}); AllocationService strategy = new MockAllocationService(Settings.EMPTY, allocationDeciders, - new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true); // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match state = ClusterState.builder(state).routingResult(result).build(); @@ -363,7 +363,7 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { new NodeVersionAllocationDecider(Settings.EMPTY)}); AllocationService strategy = new MockAllocationService(Settings.EMPTY, allocationDeciders, - new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true); // Make sure that primary shards are only allocated on the new node diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index abc561a0916..0bdab7a1158 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -30,7 +30,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -59,7 +59,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(getRandom()); AllocationService strategy = new AllocationService(settingsBuilder().build(), new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY), - randomAllocationDecider))), new ShardsAllocators(NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); + randomAllocationDecider))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE); int indices = scaledRandomIntBetween(1, 20); Builder metaBuilder = MetaData.builder(); int maxNumReplicas = 1; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 0855263dd06..928756fec01 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -39,7 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; @@ -65,10 +65,6 @@ import static org.hamcrest.Matchers.nullValue; public class DiskThresholdDeciderTests extends ESAllocationTestCase { - private static ShardsAllocators makeShardsAllocators() { - return new ShardsAllocators(NoopGatewayAllocator.INSTANCE); - } - public void testDiskThreshold() { Settings diskSettings = settingsBuilder() .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) @@ -109,7 +105,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -194,7 +190,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); routingTable = strategy.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -225,7 +221,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); routingTable = strategy.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -305,7 +301,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) @@ -362,7 +358,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); routingTable = strategy.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -429,7 +425,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); routingTable = strategy.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -460,7 +456,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); routingTable = strategy.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -569,7 +565,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) @@ -637,7 +633,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) @@ -740,7 +736,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -902,7 +898,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away // and therefor we will have sufficient disk space on node1. RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); @@ -1003,7 +999,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) - .build(), deciders, makeShardsAllocators(), cis); + .build(), deciders, NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(Settings.EMPTY), cis); RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 0ca261cbf65..67501d55a95 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -492,7 +492,7 @@ public class NodeJoinControllerTests extends ESTestCase { static class NoopAllocationService extends AllocationService { public NoopAllocationService(Settings settings) { - super(settings, null, null, null); + super(settings, null, null, null, null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index f653819c140..1a38e32cf1a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -33,7 +33,8 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; -import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -79,19 +80,19 @@ public abstract class ESAllocationTestCase extends ESTestCase { public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings, Random random) { return new MockAllocationService(settings, randomAllocationDeciders(settings, clusterSettings, random), - new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); } public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { return new MockAllocationService(settings, randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), - new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), clusterInfoService); } - public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { + public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) { return new MockAllocationService(settings, randomAllocationDeciders(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), - new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE); + gatewayAllocator, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); } @@ -193,8 +194,9 @@ public abstract class ESAllocationTestCase extends ESTestCase { private Long nanoTimeOverride = null; - public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { - super(settings, allocationDeciders, shardsAllocators, clusterInfoService); + public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator, + ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) { + super(settings, allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService); } public void setNanoTimeOverride(long nanoTime) {