extract logic of allocation to a separate module

This commit is contained in:
Shay Banon 2011-09-06 17:11:55 +03:00
parent 37f08ea8b8
commit d86c116273
39 changed files with 533 additions and 242 deletions

View File

@ -35,7 +35,7 @@ import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.cluster.routing.allocation.AllocationModule;
import org.elasticsearch.cluster.routing.operation.OperationRoutingModule;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.collect.ImmutableList;
@ -56,7 +56,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
}
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new ShardAllocationModule(settings), new OperationRoutingModule(settings));
return ImmutableList.of(new AllocationModule(settings), new OperationRoutingModule(settings));
}
@Override

View File

@ -28,8 +28,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -62,18 +62,18 @@ public class ShardStateAction extends AbstractComponent {
private final ClusterService clusterService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
private final ThreadPool threadPool;
private final BlockingQueue<ShardRouting> startedShardsQueue = new LinkedTransferQueue<ShardRouting>();
@Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
ShardsAllocation shardsAllocation, ThreadPool threadPool) {
AllocationService allocationService, ThreadPool threadPool) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
this.threadPool = threadPool;
transportService.registerHandler(ShardStartedTransportHandler.ACTION, new ShardStartedTransportHandler());
@ -119,7 +119,7 @@ public class ShardStateAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShard(currentState, shardRouting);
RoutingAllocation.Result routingResult = allocationService.applyFailedShard(currentState, shardRouting);
if (!routingResult.changed()) {
return currentState;
}
@ -185,7 +185,7 @@ public class ShardStateAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("applying started shards {}, reason [{}]", shards, reason);
}
RoutingAllocation.Result routingResult = shardsAllocation.applyStartedShards(currentState, shards);
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards);
if (!routingResult.changed()) {
return currentState;
}

View File

@ -28,8 +28,8 @@ import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
@ -88,7 +88,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final IndicesService indicesService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
@ -97,13 +97,13 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final String riverIndexName;
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) {
AllocationService allocationService, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) {
super(settings);
this.environment = environment;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.metaDataService = metaDataService;
this.riverIndexName = riverIndexName;
@ -280,7 +280,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
.initializeEmpty(updatedState.metaData().index(request.index), true);
routingTableBuilder.add(indexRoutingBuilder);
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
}

View File

@ -25,8 +25,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -51,18 +51,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private final ClusterService clusterService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final MetaDataService metaDataService;
@Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation,
@Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService,
NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.metaDataService = metaDataService;
}
@ -97,7 +97,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
.remove(request.index)
.build();
RoutingAllocation.Result routingResult = shardsAllocation.reroute(
RoutingAllocation.Result routingResult = allocationService.reroute(
newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();

View File

@ -27,8 +27,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -47,12 +47,12 @@ public class MetaDataStateIndexService extends AbstractComponent {
private final ClusterService clusterService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
@Inject public MetaDataStateIndexService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) {
@Inject public MetaDataStateIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
super(settings);
this.clusterService = clusterService;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
}
public void closeIndex(final Request request, final Listener listener) {
@ -85,7 +85,7 @@ public class MetaDataStateIndexService extends AbstractComponent {
.routingTable(currentState.routingTable())
.remove(request.index);
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build());
return ClusterState.builder().state(updatedState).routingResult(routingResult).build();
}
@ -127,7 +127,7 @@ public class MetaDataStateIndexService extends AbstractComponent {
.initializeEmpty(updatedState.metaData().index(request.index), false);
rtBuilder.add(indexRoutingBuilder);
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(rtBuilder).build());
return ClusterState.builder().state(updatedState).routingResult(routingResult).build();
}

View File

@ -26,8 +26,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
@ -47,13 +47,13 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
private final ClusterService clusterService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
@Inject public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) {
@Inject public MetaDataUpdateSettingsService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
super(settings);
this.clusterService = clusterService;
this.clusterService.add(this);
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
}
@Override public void clusterChanged(ClusterChangedEvent event) {
@ -191,7 +191,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
ClusterState updatedState = ClusterState.builder().state(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder).build();
// now, reroute in case things change that require it (like number of replicas)
RoutingAllocation.Result routingResult = shardsAllocation.reroute(updatedState);
RoutingAllocation.Result routingResult = allocationService.reroute(updatedState);
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
return updatedState;

View File

@ -25,8 +25,8 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -49,7 +49,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private final ClusterService clusterService;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
private final TimeValue schedule;
@ -57,11 +57,11 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private volatile Future scheduledRoutingTableFuture;
@Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation) {
@Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10));
}
@ -124,7 +124,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;

View File

@ -19,8 +19,12 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
@ -29,11 +33,18 @@ import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class ShardAllocationModule extends AbstractModule {
public class AllocationModule extends AbstractModule implements SpawnModules {
private final Settings settings;
private List<Class<? extends NodeAllocation>> allocations = Lists.newArrayList();
public ShardAllocationModule(Settings settings) {
public AllocationModule(Settings settings) {
this.settings = settings;
}
@Override public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new ShardsAllocatorModule(settings));
}
public void addNodeAllocation(Class<? extends NodeAllocation> nodeAllocation) {
@ -41,7 +52,7 @@ public class ShardAllocationModule extends AbstractModule {
}
@Override protected void configure() {
bind(ShardsAllocation.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
Multibinder<NodeAllocation> allocationMultibinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
allocationMultibinder.addBinding().to(SameShardNodeAllocation.class);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -44,21 +45,27 @@ import static org.elasticsearch.common.collect.Sets.*;
/**
* @author kimchy (shay.banon)
*/
public class ShardsAllocation extends AbstractComponent {
public class AllocationService extends AbstractComponent {
private final NodeAllocations nodeAllocations;
public ShardsAllocation() {
private final ShardsAllocators shardsAllocators;
public AllocationService() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public ShardsAllocation(Settings settings) {
this(settings, new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)));
public AllocationService(Settings settings) {
this(settings,
new NodeAllocations(settings, new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)),
new ShardsAllocators(settings)
);
}
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations) {
@Inject public AllocationService(Settings settings, NodeAllocations nodeAllocations, ShardsAllocators shardsAllocators) {
super(settings);
this.nodeAllocations = nodeAllocations;
this.shardsAllocators = shardsAllocators;
}
/**
@ -69,11 +76,11 @@ public class ShardsAllocation extends AbstractComponent {
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards);
nodeAllocations.applyStartedShards(nodeAllocations, allocation);
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
shardsAllocators.applyStartedShards(nodeAllocations, allocation);
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
@ -90,7 +97,7 @@ public class ShardsAllocation extends AbstractComponent {
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
shardsAllocators.applyFailedShards(nodeAllocations, allocation);
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
@ -151,73 +158,17 @@ public class ShardsAllocation extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) {
changed |= nodeAllocations.allocateUnassigned(nodeAllocations, allocation);
changed |= allocateUnassigned(allocation);
changed |= shardsAllocators.allocateUnassigned(nodeAllocations, allocation);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimaries(allocation.routingNodes());
}
// rebalance
changed |= rebalance(allocation);
changed |= shardsAllocators.rebalance(nodeAllocations, allocation);
return changed;
}
private boolean rebalance(RoutingAllocation allocation) {
boolean changed = false;
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
if (sortedNodesLeastToHigh.isEmpty()) {
return false;
}
int lowIndex = 0;
int highIndex = sortedNodesLeastToHigh.size() - 1;
boolean relocationPerformed;
do {
relocationPerformed = false;
while (lowIndex != highIndex) {
RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex);
RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex);
int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode();
// only active shards can be removed so must count only active ones.
if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) {
highIndex--;
continue;
}
if (lowRoutingNode.shards().size() >= averageNumOfShards) {
lowIndex++;
continue;
}
boolean relocated = false;
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting startedShard : startedShards) {
if (!nodeAllocations.canRebalance(startedShard, allocation)) {
continue;
}
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
startedShard.primary(), INITIALIZING, startedShard.version() + 1));
startedShard.relocate(lowRoutingNode.nodeId());
relocated = true;
relocationPerformed = true;
break;
}
}
if (!relocated) {
highIndex--;
}
}
} while (relocationPerformed);
return changed;
}
private boolean electPrimaries(RoutingNodes routingNodes) {
boolean changed = false;
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
@ -248,56 +199,6 @@ public class ShardsAllocation extends AbstractComponent {
return changed;
}
private boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
List<RoutingNode> nodes = routingNodes.sortedNodesLeastToHigh();
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
int lastNode = 0;
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
// do the allocation, finding the least "busy" node
for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode);
lastNode++;
if (lastNode == nodes.size()) {
lastNode = 0;
}
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) {
continue;
}
changed = true;
node.add(shard);
unassignedIterator.remove();
break;
}
}
}
// allocate all the unassigned shards above the average per node.
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) {
changed = true;
routingNode.add(shard);
it.remove();
break;
}
}
}
return changed;
}
/**
* Applies the new nodes to the routing nodes and returns them (just the
* new nodes);

View File

@ -35,15 +35,18 @@ public abstract class NodeAllocation extends AbstractComponent {
YES {
@Override public boolean allocate() {
return true;
}},
}
},
NO {
@Override public boolean allocate() {
return false;
}},
}
},
THROTTLE {
@Override public boolean allocate() {
return false;
}};
}
};
public abstract boolean allocate();
}
@ -52,17 +55,6 @@ public abstract class NodeAllocation extends AbstractComponent {
super(settings);
}
public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
}
public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
}
public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
return false;
}
public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return true;
}

View File

@ -54,18 +54,6 @@ public class NodeAllocations extends NodeAllocation {
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
allocation1.applyStartedShards(nodeAllocations, allocation);
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
allocation1.applyFailedShards(nodeAllocations, allocation);
}
}
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
if (!allocation1.canRebalance(shardRouting, allocation)) {
@ -75,14 +63,6 @@ public class NodeAllocations extends NodeAllocation {
return true;
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
for (NodeAllocation allocation1 : allocations) {
changed |= allocation1.allocateUnassigned(nodeAllocations, allocation);
}
return changed;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
// first, check if its in the ignored, if so, return NO

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.Iterator;
import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
/**
*/
public class EvenShardsCountAllocator extends AbstractComponent implements ShardsAllocator {
@Inject public EvenShardsCountAllocator(Settings settings) {
super(settings);
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
List<RoutingNode> nodes = routingNodes.sortedNodesLeastToHigh();
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
int lastNode = 0;
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
// do the allocation, finding the least "busy" node
for (int i = 0; i < nodes.size(); i++) {
RoutingNode node = nodes.get(lastNode);
lastNode++;
if (lastNode == nodes.size()) {
lastNode = 0;
}
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) {
continue;
}
changed = true;
node.add(shard);
unassignedIterator.remove();
break;
}
}
}
// allocate all the unassigned shards above the average per node.
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) {
changed = true;
routingNode.add(shard);
it.remove();
break;
}
}
}
return changed;
}
@Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
if (sortedNodesLeastToHigh.isEmpty()) {
return false;
}
int lowIndex = 0;
int highIndex = sortedNodesLeastToHigh.size() - 1;
boolean relocationPerformed;
do {
relocationPerformed = false;
while (lowIndex != highIndex) {
RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex);
RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex);
int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode();
// only active shards can be removed so must count only active ones.
if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) {
highIndex--;
continue;
}
if (lowRoutingNode.shards().size() >= averageNumOfShards) {
lowIndex++;
continue;
}
boolean relocated = false;
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting startedShard : startedShards) {
if (!nodeAllocations.canRebalance(startedShard, allocation)) {
continue;
}
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
startedShard.primary(), INITIALIZING, startedShard.version() + 1));
startedShard.relocate(lowRoutingNode.nodeId());
relocated = true;
relocationPerformed = true;
break;
}
}
if (!relocated) {
highIndex--;
}
}
} while (relocationPerformed);
return changed;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
/**
* The gateway allocator allows for a pluggable control of the gateway to allocate unassigned shards.
*/
public interface GatewayAllocator {
void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation);
void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation);
boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation);
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
/**
*/
public interface ShardsAllocator {
void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation);
void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation);
boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation);
boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation);
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
/**
*/
public class ShardsAllocatorModule extends AbstractModule {
private Settings settings;
private Class<? extends GatewayAllocator> gatewayAllocator = NoneGatewayAllocator.class;
public ShardsAllocatorModule(Settings settings) {
this.settings = settings;
}
public void setGatewayAllocator(Class<? extends GatewayAllocator> gatewayAllocator) {
this.gatewayAllocator = gatewayAllocator;
}
@Override protected void configure() {
bind(GatewayAllocator.class).to(gatewayAllocator).asEagerSingleton();
bind(ShardsAllocator.class).to(EvenShardsCountAllocator.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
/**
*/
public class ShardsAllocators extends AbstractComponent implements ShardsAllocator {
private final GatewayAllocator gatewayAllocator;
private final ShardsAllocator allocator;
public ShardsAllocators() {
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
}
public ShardsAllocators(Settings settings) {
this(settings, new NoneGatewayAllocator(), new EvenShardsCountAllocator(settings));
}
@Inject public ShardsAllocators(Settings settings, GatewayAllocator gatewayAllocator, ShardsAllocator allocator) {
super(settings);
this.gatewayAllocator = gatewayAllocator;
this.allocator = allocator;
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
gatewayAllocator.applyStartedShards(nodeAllocations, allocation);
allocator.applyStartedShards(nodeAllocations, allocation);
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
gatewayAllocator.applyFailedShards(nodeAllocations, allocation);
allocator.applyFailedShards(nodeAllocations, allocation);
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
changed |= gatewayAllocator.allocateUnassigned(nodeAllocations, allocation);
changed |= allocator.allocateUnassigned(nodeAllocations, allocation);
return changed;
}
@Override public boolean rebalance(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
return allocator.rebalance(nodeAllocations, allocation);
}
}

View File

@ -36,8 +36,8 @@ import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -64,7 +64,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final ThreadPool threadPool;
private final ShardsAllocation shardsAllocation;
private final AllocationService allocationService;
private final ClusterService clusterService;
@ -84,10 +84,10 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final AtomicBoolean recovered = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ShardsAllocation shardsAllocation, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) {
@Inject public GatewayService(Settings settings, Gateway gateway, AllocationService allocationService, ClusterService clusterService, DiscoveryService discoveryService, MetaDataCreateIndexService createIndexService, ThreadPool threadPool) {
super(settings);
this.gateway = gateway;
this.shardsAllocation = shardsAllocation;
this.allocationService = allocationService;
this.clusterService = clusterService;
this.discoveryService = discoveryService;
this.createIndexService = createIndexService;
@ -283,7 +283,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
routingTableBuilder.version(recoveredState.version());
// now, reroute
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
}

View File

@ -26,9 +26,15 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -51,7 +57,7 @@ import java.util.concurrent.ConcurrentMap;
/**
* @author kimchy (shay.banon)
*/
public class BlobReuseExistingNodeAllocation extends NodeAllocation {
public class BlobReuseExistingGatewayAllocator extends AbstractComponent implements GatewayAllocator {
private final Node node;
@ -63,8 +69,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final ConcurrentMap<ShardId, Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
@Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
@Inject public BlobReuseExistingGatewayAllocator(Settings settings, Node node,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency
this.listShardStoreMetaData = transportNodesListShardStoreMetaData;
@ -144,7 +150,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) {
if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) {
continue;
}
@ -259,7 +265,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
} else {
nodesIds = Sets.newHashSet();
// clean nodes that have failed
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext();) {
for (Iterator<DiscoveryNode> it = shardStores.keySet().iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!nodes.nodeExists(node.id())) {
it.remove();

View File

@ -19,7 +19,7 @@
package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
@ -30,8 +30,8 @@ import org.elasticsearch.common.inject.PreProcessModule;
public abstract class BlobStoreGatewayModule extends AbstractModule implements PreProcessModule {
@Override public void processModule(Module module) {
if (module instanceof ShardAllocationModule) {
((ShardAllocationModule) module).addNodeAllocation(BlobReuseExistingNodeAllocation.class);
if (module instanceof ShardsAllocatorModule) {
((ShardsAllocatorModule) module).setGatewayAllocator(BlobReuseExistingGatewayAllocator.class);
}
}
}

View File

@ -32,8 +32,10 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.iterator.TObjectLongIterator;
@ -54,7 +56,7 @@ import java.util.concurrent.ConcurrentMap;
/**
* @author kimchy (shay.banon)
*/
public class LocalGatewayNodeAllocation extends NodeAllocation {
public class LocalGatewayAllocator extends AbstractComponent implements GatewayAllocator {
static {
IndexMetaData.addDynamicSettings("index.recovery.initial_shards");
@ -72,8 +74,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
private final String initialShards;
@Inject public LocalGatewayNodeAllocation(Settings settings,
TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) {
@Inject public LocalGatewayAllocator(Settings settings,
TransportNodesListGatewayStartedShards listGatewayStartedShards, TransportNodesListShardStoreMetaData listShardStoreMetaData) {
super(settings);
this.listGatewayStartedShards = listGatewayStartedShards;
this.listShardStoreMetaData = listShardStoreMetaData;
@ -187,10 +189,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
Decision decision = nodeAllocations.canAllocate(shard, node, allocation);
NodeAllocation.Decision decision = nodeAllocations.canAllocate(shard, node, allocation);
if (decision == NodeAllocation.Decision.THROTTLE) {
throttledNodes.add(discoNode);
} else if (decision == Decision.NO) {
} else if (decision == NodeAllocation.Decision.NO) {
noNodes.add(discoNode);
} else {
if (logger.isDebugEnabled()) {
@ -284,7 +286,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) {
if (nodeAllocations.canAllocate(shard, node, allocation) == NodeAllocation.Decision.NO) {
continue;
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.gateway.local;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
@ -37,8 +37,8 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu
}
@Override public void processModule(Module module) {
if (module instanceof ShardAllocationModule) {
((ShardAllocationModule) module).addNodeAllocation(LocalGatewayNodeAllocation.class);
if (module instanceof ShardsAllocatorModule) {
((ShardsAllocatorModule) module).setGatewayAllocator(LocalGatewayAllocator.class);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.none;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
/**
*/
public class NoneGatewayAllocator implements GatewayAllocator {
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
return false;
}
}

View File

@ -19,13 +19,21 @@
package org.elasticsearch.gateway.none;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.gateway.Gateway;
/**
* @author kimchy (Shay Banon)
*/
public class NoneGatewayModule extends AbstractModule {
public class NoneGatewayModule extends AbstractModule implements PreProcessModule {
@Override public void processModule(Module module) {
if (module instanceof ShardsAllocatorModule) {
((ShardsAllocatorModule) module).setGatewayAllocator(NoneGatewayAllocator.class);
}
}
@Override protected void configure() {
bind(Gateway.class).to(NoneGateway.class).asEagerSingleton();

View File

@ -44,7 +44,7 @@ public class ClusterRebalanceRoutingTests {
private final ESLogger logger = Loggers.getLogger(ClusterRebalanceRoutingTests.class);
@Test public void testAlways() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -129,7 +129,7 @@ public class ClusterRebalanceRoutingTests {
@Test public void testClusterPrimariesActive1() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -232,7 +232,7 @@ public class ClusterRebalanceRoutingTests {
}
@Test public void testClusterPrimariesActive2() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -315,7 +315,7 @@ public class ClusterRebalanceRoutingTests {
}
@Test public void testClusterAllActive1() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -437,7 +437,7 @@ public class ClusterRebalanceRoutingTests {
}
@Test public void testClusterAllActive2() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -520,7 +520,7 @@ public class ClusterRebalanceRoutingTests {
}
@Test public void testClusterAllActive3() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.INDICES_ALL_ACTIVE.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))

View File

@ -44,7 +44,7 @@ public class ConcurrentRebalanceRoutingTests {
private final ESLogger logger = Loggers.getLogger(ConcurrentRebalanceRoutingTests.class);
@Test public void testClusterConcurrentRebalance() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 3)

View File

@ -47,7 +47,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class);
@Test public void testElectReplicaAsPrimaryDuringRelocation() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -44,7 +44,7 @@ public class FailedNodeRoutingTests {
private final ESLogger logger = Loggers.getLogger(FailedNodeRoutingTests.class);
@Test public void simpleFailedNodeTest() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -103,7 +103,7 @@ public class FailedNodeRoutingTests {
}
@Test public void simpleFailedNodeTestNoReassign() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))

View File

@ -49,7 +49,7 @@ public class FailedShardsRoutingTests {
private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class);
@Test public void failPrimaryStartedCheckReplicaElected() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
@ -129,7 +129,7 @@ public class FailedShardsRoutingTests {
}
@Test public void firstAllocationFailureSingleNode() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
@ -185,7 +185,7 @@ public class FailedShardsRoutingTests {
}
@Test public void firstAllocationFailureTwoNodes() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
@ -241,7 +241,7 @@ public class FailedShardsRoutingTests {
}
@Test public void rebalanceFailure() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());

View File

@ -46,7 +46,7 @@ public class PrimaryElectionRoutingTests {
private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class);
@Test public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -47,7 +47,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
@Test public void testPrimaryNotRelocatedWhileBeingRecoveredFrom() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());

View File

@ -47,7 +47,7 @@ public class RebalanceAfterActiveTests {
private final ESLogger logger = Loggers.getLogger(RebalanceAfterActiveTests.class);
@Test public void testRebalanceOnlyAfterAllShardsAreActive() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)

View File

@ -46,7 +46,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
private final ESLogger logger = Loggers.getLogger(ReplicaAllocatedAfterPrimaryTests.class);
@Test public void testBackupIsAllocatedAfterPrimary() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -44,7 +44,7 @@ public class ShardVersioningTests {
private final ESLogger logger = Loggers.getLogger(ShardVersioningTests.class);
@Test public void simple() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build());
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))

View File

@ -55,7 +55,7 @@ public class SingleShardNoReplicasRoutingTests {
private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class);
@Test public void testSingleIndexStartedShard() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -156,7 +156,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testSingleIndexShardFailed() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");
@ -205,7 +205,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testMultiIndexEvenDistribution() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
@ -317,7 +317,7 @@ public class SingleShardNoReplicasRoutingTests {
}
@Test public void testMultiIndexUnevenNodes() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)

View File

@ -46,7 +46,7 @@ public class SingleShardOneReplicaRoutingTests {
private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class);
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -46,7 +46,7 @@ public class TenShardsOneReplicaRoutingTests {
private final ESLogger logger = Loggers.getLogger(TenShardsOneReplicaRoutingTests.class);
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")

View File

@ -45,7 +45,7 @@ public class ThrottlingAllocationTests {
private final ESLogger logger = Loggers.getLogger(ThrottlingAllocationTests.class);
@Test public void testPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build());
@ -105,7 +105,7 @@ public class ThrottlingAllocationTests {
}
@Test public void testReplicaAndPrimaryRecoveryThrottling() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 3)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 3)
.build());

View File

@ -27,7 +27,7 @@ public class UpdateNumberOfReplicasTests {
private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class);
@Test public void testUpdateNumberOfReplicas() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
AllocationService strategy = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("Building initial routing table");

View File

@ -24,7 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -55,7 +55,7 @@ public class ClusterSerializationTests {
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
ShardsAllocation strategy = new ShardsAllocation();
AllocationService strategy = new AllocationService();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build();
ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), newNode("node1"));
@ -77,7 +77,7 @@ public class ClusterSerializationTests {
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
ShardsAllocation strategy = new ShardsAllocation();
AllocationService strategy = new AllocationService();
RoutingTable source = strategy.reroute(clusterState).routingTable();
BytesStreamOutput outStream = new BytesStreamOutput();