diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 79c1bea0667..7fa95f9187c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -35,7 +35,7 @@ import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.routing.RoutingService; -import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule; +import org.elasticsearch.cluster.routing.allocation.AllocationModule; import org.elasticsearch.cluster.routing.operation.OperationRoutingModule; import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.collect.ImmutableList; @@ -56,7 +56,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules { } @Override public Iterable spawnModules() { - return ImmutableList.of(new ShardAllocationModule(settings), new OperationRoutingModule(settings)); + return ImmutableList.of(new AllocationModule(settings), new OperationRoutingModule(settings)); } @Override diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index d38de826bf5..f6bb94765b5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -28,8 +28,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -62,18 +62,18 @@ public class ShardStateAction extends AbstractComponent { private final ClusterService clusterService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; private final ThreadPool threadPool; private final BlockingQueue startedShardsQueue = new LinkedTransferQueue(); @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, - ShardsAllocation shardsAllocation, ThreadPool threadPool) { + AllocationService allocationService, ThreadPool threadPool) { super(settings); this.clusterService = clusterService; this.transportService = transportService; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; this.threadPool = threadPool; transportService.registerHandler(ShardStartedTransportHandler.ACTION, new ShardStartedTransportHandler()); @@ -119,7 +119,7 @@ public class ShardStateAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason); } - RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShard(currentState, shardRouting); + RoutingAllocation.Result routingResult = allocationService.applyFailedShard(currentState, shardRouting); if (!routingResult.changed()) { return currentState; } @@ -185,7 +185,7 @@ public class ShardStateAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("applying started shards {}, reason [{}]", shards, reason); } - RoutingAllocation.Result routingResult = shardsAllocation.applyStartedShards(currentState, shards); + RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards); if (!routingResult.changed()) { return currentState; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 511e2f3d156..013f4ed949e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -28,8 +28,8 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Lists; @@ -88,7 +88,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final IndicesService indicesService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; private final NodeIndexCreatedAction nodeIndexCreatedAction; @@ -97,13 +97,13 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final String riverIndexName; @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, - ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) { + AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.metaDataService = metaDataService; this.riverIndexName = riverIndexName; @@ -280,7 +280,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index) .initializeEmpty(updatedState.metaData().index(request.index), true); routingTableBuilder.add(indexRoutingBuilder); - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 493232ed380..7cf4fbf2099 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -25,8 +25,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -51,18 +51,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent { private final ClusterService clusterService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; private final NodeIndexDeletedAction nodeIndexDeletedAction; private final MetaDataService metaDataService; - @Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation, + @Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService, NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; this.nodeIndexDeletedAction = nodeIndexDeletedAction; this.metaDataService = metaDataService; } @@ -97,7 +97,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { .remove(request.index) .build(); - RoutingAllocation.Result routingResult = shardsAllocation.reroute( + RoutingAllocation.Result routingResult = allocationService.reroute( newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index 8173ac06440..333195011c2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -27,8 +27,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -47,12 +47,12 @@ public class MetaDataStateIndexService extends AbstractComponent { private final ClusterService clusterService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; - @Inject public MetaDataStateIndexService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) { + @Inject public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { super(settings); this.clusterService = clusterService; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; } public void closeIndex(final Request request, final Listener listener) { @@ -85,7 +85,7 @@ public class MetaDataStateIndexService extends AbstractComponent { .routingTable(currentState.routingTable()) .remove(request.index); - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); } @@ -127,7 +127,7 @@ public class MetaDataStateIndexService extends AbstractComponent { .initializeEmpty(updatedState.metaData().index(request.index), false); rtBuilder.add(indexRoutingBuilder); - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build()); return ClusterState.builder().state(updatedState).routingResult(routingResult).build(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index c73262d0f76..ff464c62520 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -26,8 +26,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; @@ -47,13 +47,13 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements private final ClusterService clusterService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; - @Inject public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) { + @Inject public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService, AllocationService allocationService) { super(settings); this.clusterService = clusterService; this.clusterService.add(this); - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; } @Override public void clusterChanged(ClusterChangedEvent event) { @@ -191,7 +191,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements ClusterState updatedState = ClusterState.builder().state(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder).build(); // now, reroute in case things change that require it (like number of replicas) - RoutingAllocation.Result routingResult = shardsAllocation.reroute(updatedState); + RoutingAllocation.Result routingResult = allocationService.reroute(updatedState); updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); return updatedState; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index fb2feea1b78..f16d9c4ed24 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -25,8 +25,8 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -49,7 +49,7 @@ public class RoutingService extends AbstractLifecycleComponent i private final ClusterService clusterService; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; private final TimeValue schedule; @@ -57,11 +57,11 @@ public class RoutingService extends AbstractLifecycleComponent i private volatile Future scheduledRoutingTableFuture; - @Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation) { + @Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10)); } @@ -124,7 +124,7 @@ public class RoutingService extends AbstractLifecycleComponent i } clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState); + RoutingAllocation.Result routingResult = allocationService.reroute(currentState); if (!routingResult.changed()) { // no state changed return currentState; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java similarity index 77% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java index c228189b397..4681f2691b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationModule.java @@ -19,8 +19,12 @@ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.SpawnModules; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; @@ -29,11 +33,18 @@ import java.util.List; /** * @author kimchy (shay.banon) */ -public class ShardAllocationModule extends AbstractModule { +public class AllocationModule extends AbstractModule implements SpawnModules { + + private final Settings settings; private List> allocations = Lists.newArrayList(); - public ShardAllocationModule(Settings settings) { + public AllocationModule(Settings settings) { + this.settings = settings; + } + + @Override public Iterable spawnModules() { + return ImmutableList.of(new ShardsAllocatorModule(settings)); } public void addNodeAllocation(Class nodeAllocation) { @@ -41,7 +52,7 @@ public class ShardAllocationModule extends AbstractModule { } @Override protected void configure() { - bind(ShardsAllocation.class).asEagerSingleton(); + bind(AllocationService.class).asEagerSingleton(); Multibinder allocationMultibinder = Multibinder.newSetBinder(binder(), NodeAllocation.class); allocationMultibinder.addBinding().to(SameShardNodeAllocation.class); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java similarity index 77% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 78b96345590..a3238926193 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; @@ -44,21 +45,27 @@ import static org.elasticsearch.common.collect.Sets.*; /** * @author kimchy (shay.banon) */ -public class ShardsAllocation extends AbstractComponent { +public class AllocationService extends AbstractComponent { private final NodeAllocations nodeAllocations; - public ShardsAllocation() { + private final ShardsAllocators shardsAllocators; + + public AllocationService() { this(ImmutableSettings.Builder.EMPTY_SETTINGS); } - public ShardsAllocation(Settings settings) { - this(settings, new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS))); + public AllocationService(Settings settings) { + this(settings, + new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)), + new ShardsAllocators(settings) + ); } - @Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) { + @Inject public AllocationService(Settings settings, NodeAllocations nodeAllocations, ShardsAllocators shardsAllocators) { super(settings); this.nodeAllocations = nodeAllocations; + this.shardsAllocators = shardsAllocators; } /** @@ -69,11 +76,11 @@ public class ShardsAllocation extends AbstractComponent { public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards); - nodeAllocations.applyStartedShards(nodeAllocations, allocation); boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } + shardsAllocators.applyStartedShards(nodeAllocations, allocation); reroute(allocation); return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } @@ -90,7 +97,7 @@ public class ShardsAllocation extends AbstractComponent { if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - nodeAllocations.applyFailedShards(nodeAllocations, allocation); + shardsAllocators.applyFailedShards(nodeAllocations, allocation); reroute(allocation); return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } @@ -151,73 +158,17 @@ public class ShardsAllocation extends AbstractComponent { // now allocate all the unassigned to available nodes if (allocation.routingNodes().hasUnassigned()) { - changed |= nodeAllocations.allocateUnassigned(nodeAllocations, allocation); - changed |= allocateUnassigned(allocation); + changed |= shardsAllocators.allocateUnassigned(nodeAllocations, allocation); // elect primaries again, in case this is needed with unassigned allocation changed |= electPrimaries(allocation.routingNodes()); } // rebalance - changed |= rebalance(allocation); + changed |= shardsAllocators.rebalance(nodeAllocations, allocation); return changed; } - private boolean rebalance(RoutingAllocation allocation) { - boolean changed = false; - List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); - if (sortedNodesLeastToHigh.isEmpty()) { - return false; - } - int lowIndex = 0; - int highIndex = sortedNodesLeastToHigh.size() - 1; - boolean relocationPerformed; - do { - relocationPerformed = false; - while (lowIndex != highIndex) { - RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex); - RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex); - int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode(); - - // only active shards can be removed so must count only active ones. - if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) { - highIndex--; - continue; - } - - if (lowRoutingNode.shards().size() >= averageNumOfShards) { - lowIndex++; - continue; - } - - boolean relocated = false; - List startedShards = highRoutingNode.shardsWithState(STARTED); - for (MutableShardRouting startedShard : startedShards) { - if (!nodeAllocations.canRebalance(startedShard, allocation)) { - continue; - } - - if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { - changed = true; - lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), - lowRoutingNode.nodeId(), startedShard.currentNodeId(), - startedShard.primary(), INITIALIZING, startedShard.version() + 1)); - - startedShard.relocate(lowRoutingNode.nodeId()); - relocated = true; - relocationPerformed = true; - break; - } - } - - if (!relocated) { - highIndex--; - } - } - } while (relocationPerformed); - return changed; - } - private boolean electPrimaries(RoutingNodes routingNodes) { boolean changed = false; for (MutableShardRouting shardEntry : routingNodes.unassigned()) { @@ -248,56 +199,6 @@ public class ShardsAllocation extends AbstractComponent { return changed; } - private boolean allocateUnassigned(RoutingAllocation allocation) { - boolean changed = false; - RoutingNodes routingNodes = allocation.routingNodes(); - - - List nodes = routingNodes.sortedNodesLeastToHigh(); - - Iterator unassignedIterator = routingNodes.unassigned().iterator(); - int lastNode = 0; - - while (unassignedIterator.hasNext()) { - MutableShardRouting shard = unassignedIterator.next(); - // do the allocation, finding the least "busy" node - for (int i = 0; i < nodes.size(); i++) { - RoutingNode node = nodes.get(lastNode); - lastNode++; - if (lastNode == nodes.size()) { - lastNode = 0; - } - - if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { - int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); - if (numberOfShardsToAllocate <= 0) { - continue; - } - - changed = true; - node.add(shard); - unassignedIterator.remove(); - break; - } - } - } - - // allocate all the unassigned shards above the average per node. - for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) { - MutableShardRouting shard = it.next(); - // go over the nodes and try and allocate the remaining ones - for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { - if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) { - changed = true; - routingNode.add(shard); - it.remove(); - break; - } - } - } - return changed; - } - /** * Applies the new nodes to the routing nodes and returns them (just the * new nodes); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java index cf24f48b89a..f1a4261878f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocation.java @@ -35,15 +35,18 @@ public abstract class NodeAllocation extends AbstractComponent { YES { @Override public boolean allocate() { return true; - }}, + } + }, NO { @Override public boolean allocate() { return false; - }}, + } + }, THROTTLE { @Override public boolean allocate() { return false; - }}; + } + }; public abstract boolean allocate(); } @@ -52,17 +55,6 @@ public abstract class NodeAllocation extends AbstractComponent { super(settings); } - public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { - } - - public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { - - } - - public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { - return false; - } - public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { return true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index dae7980fe75..9b77cccaf41 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -54,18 +54,6 @@ public class NodeAllocations extends NodeAllocation { this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]); } - @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { - for (NodeAllocation allocation1 : allocations) { - allocation1.applyStartedShards(nodeAllocations, allocation); - } - } - - @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { - for (NodeAllocation allocation1 : allocations) { - allocation1.applyFailedShards(nodeAllocations, allocation); - } - } - @Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { for (NodeAllocation allocation1 : allocations) { if (!allocation1.canRebalance(shardRouting, allocation)) { @@ -75,14 +63,6 @@ public class NodeAllocations extends NodeAllocation { return true; } - @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { - boolean changed = false; - for (NodeAllocation allocation1 : allocations) { - changed |= allocation1.allocateUnassigned(nodeAllocations, allocation); - } - return changed; - } - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { Decision ret = Decision.YES; // first, check if its in the ignored, if so, return NO diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java new file mode 100644 index 00000000000..11a426cecb4 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/EvenShardsCountAllocator.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; + +/** + */ +public class EvenShardsCountAllocator extends AbstractComponent implements ShardsAllocator { + + @Inject public EvenShardsCountAllocator(Settings settings) { + super(settings); + } + + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + } + + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + } + + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + boolean changed = false; + RoutingNodes routingNodes = allocation.routingNodes(); + + + List nodes = routingNodes.sortedNodesLeastToHigh(); + + Iterator unassignedIterator = routingNodes.unassigned().iterator(); + int lastNode = 0; + + while (unassignedIterator.hasNext()) { + MutableShardRouting shard = unassignedIterator.next(); + // do the allocation, finding the least "busy" node + for (int i = 0; i < nodes.size(); i++) { + RoutingNode node = nodes.get(lastNode); + lastNode++; + if (lastNode == nodes.size()) { + lastNode = 0; + } + + if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) { + int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size(); + if (numberOfShardsToAllocate <= 0) { + continue; + } + + changed = true; + node.add(shard); + unassignedIterator.remove(); + break; + } + } + } + + // allocate all the unassigned shards above the average per node. + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) { + MutableShardRouting shard = it.next(); + // go over the nodes and try and allocate the remaining ones + for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { + if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) { + changed = true; + routingNode.add(shard); + it.remove(); + break; + } + } + } + return changed; + } + + @Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + boolean changed = false; + List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); + if (sortedNodesLeastToHigh.isEmpty()) { + return false; + } + int lowIndex = 0; + int highIndex = sortedNodesLeastToHigh.size() - 1; + boolean relocationPerformed; + do { + relocationPerformed = false; + while (lowIndex != highIndex) { + RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex); + RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex); + int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode(); + + // only active shards can be removed so must count only active ones. + if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) { + highIndex--; + continue; + } + + if (lowRoutingNode.shards().size() >= averageNumOfShards) { + lowIndex++; + continue; + } + + boolean relocated = false; + List startedShards = highRoutingNode.shardsWithState(STARTED); + for (MutableShardRouting startedShard : startedShards) { + if (!nodeAllocations.canRebalance(startedShard, allocation)) { + continue; + } + + if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) { + changed = true; + lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), + lowRoutingNode.nodeId(), startedShard.currentNodeId(), + startedShard.primary(), INITIALIZING, startedShard.version() + 1)); + + startedShard.relocate(lowRoutingNode.nodeId()); + relocated = true; + relocationPerformed = true; + break; + } + } + + if (!relocated) { + highIndex--; + } + } + } while (relocationPerformed); + return changed; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java new file mode 100644 index 00000000000..d8ad63e1124 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GatewayAllocator.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; + +/** + * The gateway allocator allows for a pluggable control of the gateway to allocate unassigned shards. + */ +public interface GatewayAllocator { + + void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation); + + void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation); + + boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java new file mode 100644 index 00000000000..fea1cf76c01 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; + +/** + */ +public interface ShardsAllocator { + + void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation); + + void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation); + + boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation); + + boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java new file mode 100644 index 00000000000..3d401e2441f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocatorModule.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.none.NoneGatewayAllocator; + +/** + */ +public class ShardsAllocatorModule extends AbstractModule { + + private Settings settings; + + private Class gatewayAllocator = NoneGatewayAllocator.class; + + public ShardsAllocatorModule(Settings settings) { + this.settings = settings; + } + + public void setGatewayAllocator(Class gatewayAllocator) { + this.gatewayAllocator = gatewayAllocator; + } + + @Override protected void configure() { + bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton(); + bind(ShardsAllocator.class).to(EvenShardsCountAllocator.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java new file mode 100644 index 00000000000..6afc7805984 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.none.NoneGatewayAllocator; + +/** + */ +public class ShardsAllocators extends AbstractComponent implements ShardsAllocator { + + private final GatewayAllocator gatewayAllocator; + private final ShardsAllocator allocator; + + public ShardsAllocators() { + this(ImmutableSettings.Builder.EMPTY_SETTINGS); + } + + public ShardsAllocators(Settings settings) { + this(settings, new NoneGatewayAllocator(), new EvenShardsCountAllocator(settings)); + } + + @Inject public ShardsAllocators(Settings settings, GatewayAllocator gatewayAllocator, ShardsAllocator allocator) { + super(settings); + this.gatewayAllocator = gatewayAllocator; + this.allocator = allocator; + } + + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + gatewayAllocator.applyStartedShards(nodeAllocations, allocation); + allocator.applyStartedShards(nodeAllocations, allocation); + } + + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + gatewayAllocator.applyFailedShards(nodeAllocations, allocation); + allocator.applyFailedShards(nodeAllocations, allocation); + } + + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + boolean changed = false; + changed |= gatewayAllocator.allocateUnassigned(nodeAllocations, allocation); + changed |= allocator.allocateUnassigned(nodeAllocations, allocation); + return changed; + } + + @Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + return allocator.rebalance(nodeAllocations, allocation); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index fa8b06e98d5..94bf0a37fa7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -36,8 +36,8 @@ import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -64,7 +64,7 @@ public class GatewayService extends AbstractLifecycleComponent i private final ThreadPool threadPool; - private final ShardsAllocation shardsAllocation; + private final AllocationService allocationService; private final ClusterService clusterService; @@ -84,10 +84,10 @@ public class GatewayService extends AbstractLifecycleComponent i private final AtomicBoolean recovered = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); - @Inject public GatewayService(Settings settings, Gateway gateway, ShardsAllocation shardsAllocation, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) { + @Inject public GatewayService(Settings settings, Gateway gateway, AllocationService allocationService, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) { super(settings); this.gateway = gateway; - this.shardsAllocation = shardsAllocation; + this.allocationService = allocationService; this.clusterService = clusterService; this.discoveryService = discoveryService; this.createIndexService = createIndexService; @@ -283,7 +283,7 @@ public class GatewayService extends AbstractLifecycleComponent i routingTableBuilder.version(recoveredState.version()); // now, reroute - RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build()); return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java similarity index 94% rename from modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java index 921fa151ba2..d58af81fc46 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingGatewayAllocator.java @@ -26,9 +26,15 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.*; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -51,7 +57,7 @@ import java.util.concurrent.ConcurrentMap; /** * @author kimchy (shay.banon) */ -public class BlobReuseExistingNodeAllocation extends NodeAllocation { +public class BlobReuseExistingGatewayAllocator extends AbstractComponent implements GatewayAllocator { private final Node node; @@ -63,8 +69,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); - @Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node, - TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { + @Inject public BlobReuseExistingGatewayAllocator(Settings settings, Node node, + TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { super(settings); this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency this.listShardStoreMetaData = transportNodesListShardStoreMetaData; @@ -144,7 +150,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) { + if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) { continue; } @@ -259,7 +265,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } else { nodesIds = Sets.newHashSet(); // clean nodes that have failed - for (Iterator it = shardStores.keySet().iterator(); it.hasNext();) { + for (Iterator it = shardStores.keySet().iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!nodes.nodeExists(node.id())) { it.remove(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGatewayModule.java index 043c0b7e3ac..20f192fda33 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGatewayModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGatewayModule.java @@ -19,7 +19,7 @@ package org.elasticsearch.gateway.blobstore; -import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.PreProcessModule; @@ -30,8 +30,8 @@ import org.elasticsearch.common.inject.PreProcessModule; public abstract class BlobStoreGatewayModule extends AbstractModule implements PreProcessModule { @Override public void processModule(Module module) { - if (module instanceof ShardAllocationModule) { - ((ShardAllocationModule) module).addNodeAllocation(BlobReuseExistingNodeAllocation.class); + if (module instanceof ShardsAllocatorModule) { + ((ShardsAllocatorModule) module).setGatewayAllocator(BlobReuseExistingGatewayAllocator.class); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java similarity index 96% rename from modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java index ab6d31fca8f..31517010043 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java @@ -32,8 +32,10 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.iterator.TObjectLongIterator; @@ -54,7 +56,7 @@ import java.util.concurrent.ConcurrentMap; /** * @author kimchy (shay.banon) */ -public class LocalGatewayNodeAllocation extends NodeAllocation { +public class LocalGatewayAllocator extends AbstractComponent implements GatewayAllocator { static { IndexMetaData.addDynamicSettings("index.recovery.initial_shards"); @@ -72,8 +74,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { private final String initialShards; - @Inject public LocalGatewayNodeAllocation(Settings settings, - TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) { + @Inject public LocalGatewayAllocator(Settings settings, + TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) { super(settings); this.listGatewayStartedShards = listGatewayStartedShards; this.listShardStoreMetaData = listShardStoreMetaData; @@ -187,10 +189,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { Set noNodes = Sets.newHashSet(); for (DiscoveryNode discoNode : nodesWithHighestVersion) { RoutingNode node = routingNodes.node(discoNode.id()); - Decision decision = nodeAllocations.canAllocate(shard, node, allocation); + NodeAllocation.Decision decision = nodeAllocations.canAllocate(shard, node, allocation); if (decision == NodeAllocation.Decision.THROTTLE) { throttledNodes.add(discoNode); - } else if (decision == Decision.NO) { + } else if (decision == NodeAllocation.Decision.NO) { noNodes.add(discoNode); } else { if (logger.isDebugEnabled()) { @@ -284,7 +286,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { // check if we can allocate on that node... // we only check for NO, since if this node is THROTTLING and it has enough "same data" // then we will try and assign it next time - if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) { + if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) { continue; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java index 8ad3ad68a52..bd199ffebf8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayModule.java @@ -19,7 +19,7 @@ package org.elasticsearch.gateway.local; -import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.PreProcessModule; @@ -37,8 +37,8 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu } @Override public void processModule(Module module) { - if (module instanceof ShardAllocationModule) { - ((ShardAllocationModule) module).addNodeAllocation(LocalGatewayNodeAllocation.class); + if (module instanceof ShardsAllocatorModule) { + ((ShardsAllocatorModule) module).setGatewayAllocator(LocalGatewayAllocator.class); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java new file mode 100644 index 00000000000..7513a7dddd5 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayAllocator.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway.none; + +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.NodeAllocations; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator; + +/** + */ +public class NoneGatewayAllocator implements GatewayAllocator { + + @Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) { + } + + @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { + } + + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { + return false; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayModule.java index e4fd9502a21..510cad6bee8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/none/NoneGatewayModule.java @@ -19,13 +19,21 @@ package org.elasticsearch.gateway.none; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.gateway.Gateway; /** - * @author kimchy (Shay Banon) */ -public class NoneGatewayModule extends AbstractModule { +public class NoneGatewayModule extends AbstractModule implements PreProcessModule { + + @Override public void processModule(Module module) { + if (module instanceof ShardsAllocatorModule) { + ((ShardsAllocatorModule) module).setGatewayAllocator(NoneGatewayAllocator.class); + } + } @Override protected void configure() { bind(Gateway.class).to(NoneGateway.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java index 15b92f38383..51f20f7f668 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java @@ -44,7 +44,7 @@ public class ClusterRebalanceRoutingTests { private final ESLogger logger = Loggers.getLogger(ClusterRebalanceRoutingTests.class); @Test public void testAlways() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -129,7 +129,7 @@ public class ClusterRebalanceRoutingTests { @Test public void testClusterPrimariesActive1() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -232,7 +232,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterPrimariesActive2() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -315,7 +315,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive1() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -437,7 +437,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive2() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -520,7 +520,7 @@ public class ClusterRebalanceRoutingTests { } @Test public void testClusterAllActive3() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 92580dac57e..a075d5576e9 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -44,7 +44,7 @@ public class ConcurrentRebalanceRoutingTests { private final ESLogger logger = Loggers.getLogger(ConcurrentRebalanceRoutingTests.class); @Test public void testClusterConcurrentRebalance() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java index 7e029a81437..b5bd0711857 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ElectReplicaAsPrimaryDuringRelocationTests.java @@ -47,7 +47,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests { private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class); @Test public void testElectReplicaAsPrimaryDuringRelocation() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 4f81d7c04ef..0b5e8f4fcc5 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -44,7 +44,7 @@ public class FailedNodeRoutingTests { private final ESLogger logger = Loggers.getLogger(FailedNodeRoutingTests.class); @Test public void simpleFailedNodeTest() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) @@ -103,7 +103,7 @@ public class FailedNodeRoutingTests { } @Test public void simpleFailedNodeTestNoReassign() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index b04045a9d7d..bae094ac335 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -49,7 +49,7 @@ public class FailedShardsRoutingTests { private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class); @Test public void failPrimaryStartedCheckReplicaElected() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .build()); @@ -129,7 +129,7 @@ public class FailedShardsRoutingTests { } @Test public void firstAllocationFailureSingleNode() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .build()); @@ -185,7 +185,7 @@ public class FailedShardsRoutingTests { } @Test public void firstAllocationFailureTwoNodes() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .build()); @@ -241,7 +241,7 @@ public class FailedShardsRoutingTests { } @Test public void rebalanceFailure() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .build()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 0c44293f81b..76a66a696be 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -46,7 +46,7 @@ public class PrimaryElectionRoutingTests { private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class); @Test public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 3242d796fad..a2b6da7dadf 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -47,7 +47,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests { @Test public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .build()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 09c4757345f..c9d26a0714e 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -47,7 +47,7 @@ public class RebalanceAfterActiveTests { private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class); @Test public void testRebalanceOnlyAfterAllShardsAreActive() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java index 6779f7ae5bd..d48f8a3e135 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ReplicaAllocatedAfterPrimaryTests.java @@ -46,7 +46,7 @@ public class ReplicaAllocatedAfterPrimaryTests { private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class); @Test public void testBackupIsAllocatedAfterPrimary() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java index 7b288290491..96fdd548125 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java @@ -44,7 +44,7 @@ public class ShardVersioningTests { private final ESLogger logger = Loggers.getLogger(ShardVersioningTests.class); @Test public void simple() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); MetaData metaData = newMetaDataBuilder() .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index 491af02aa56..d9e0583b936 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -55,7 +55,7 @@ public class SingleShardNoReplicasRoutingTests { private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class); @Test public void testSingleIndexStartedShard() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -156,7 +156,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testSingleIndexShardFailed() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); @@ -205,7 +205,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testMultiIndexEvenDistribution() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) @@ -317,7 +317,7 @@ public class SingleShardNoReplicasRoutingTests { } @Test public void testMultiIndexUnevenNodes() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java index 59bcd7c390d..0093a3929f8 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardOneReplicaRoutingTests.java @@ -46,7 +46,7 @@ public class SingleShardOneReplicaRoutingTests { private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class); @Test public void testSingleIndexFirstStartPrimaryThenBackups() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index eb05c0a0514..5e4aefc7e1c 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -46,7 +46,7 @@ public class TenShardsOneReplicaRoutingTests { private final ESLogger logger = Loggers.getLogger(TenShardsOneReplicaRoutingTests.class); @Test public void testSingleIndexFirstStartPrimaryThenBackups() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index ebfee63766c..d91cc6b11f1 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -45,7 +45,7 @@ public class ThrottlingAllocationTests { private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class); @Test public void testPrimaryRecoveryThrottling() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .build()); @@ -105,7 +105,7 @@ public class ThrottlingAllocationTests { } @Test public void testReplicaAndPrimaryRecoveryThrottling() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .build()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java index b3db39dd14c..eaf3bd87ca9 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/UpdateNumberOfReplicasTests.java @@ -27,7 +27,7 @@ public class UpdateNumberOfReplicasTests { private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class); @Test public void testUpdateNumberOfReplicas() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); logger.info("Building initial routing table"); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index fe24a042668..30535619e33 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.ShardsAllocation; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -55,7 +55,7 @@ public class ClusterSerializationTests { ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); - ShardsAllocation strategy = new ShardsAllocation(); + AllocationService strategy = new AllocationService(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build(); ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1")); @@ -77,7 +77,7 @@ public class ClusterSerializationTests { ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); - ShardsAllocation strategy = new ShardsAllocation(); + AllocationService strategy = new AllocationService(); RoutingTable source = strategy.reroute(clusterState).routingTable(); BytesStreamOutput outStream = new BytesStreamOutput();