refactoring in routing logic to allow adding allocation explanation that later can be shown

This commit is contained in:
kimchy 2010-09-21 11:37:36 +02:00
parent dbb2ae69b9
commit b5b36215ae
30 changed files with 512 additions and 226 deletions

View File

@ -25,6 +25,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.settings.Settings;
@ -46,19 +48,22 @@ public class ClusterState {
private final ClusterBlocks blocks;
private final AllocationExplanation allocationExplanation;
// built on demand
private volatile RoutingNodes routingNodes;
public ClusterState(long version, ClusterState state) {
this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks());
this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.allocationExplanation());
}
public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks) {
public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, AllocationExplanation allocationExplanation) {
this.version = version;
this.metaData = metaData;
this.routingTable = routingTable;
this.nodes = nodes;
this.blocks = blocks;
this.allocationExplanation = allocationExplanation;
}
public long version() {
@ -109,6 +114,14 @@ public class ClusterState {
return blocks;
}
public AllocationExplanation allocationExplanation() {
return this.allocationExplanation;
}
public AllocationExplanation getAllocationExplanation() {
return allocationExplanation();
}
/**
* Returns a built (on demand) routing nodes view of the routing table. <b>NOTE, the routing nodes
* are mutable, use them just for read operations</b>
@ -141,6 +154,8 @@ public class ClusterState {
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private AllocationExplanation allocationExplanation = AllocationExplanation.EMPTY;
public Builder nodes(DiscoveryNodes.Builder nodesBuilder) {
return nodes(nodesBuilder.build());
}
@ -154,6 +169,12 @@ public class ClusterState {
return routingTable(routingTable.build());
}
public Builder routingResult(RoutingAllocation.Result routingResult) {
this.routingTable = routingResult.routingTable();
this.allocationExplanation = routingResult.explanation();
return this;
}
public Builder routingTable(RoutingTable routingTable) {
this.routingTable = routingTable;
return this;
@ -177,6 +198,11 @@ public class ClusterState {
return this;
}
public Builder allocationExplanation(AllocationExplanation allocationExplanation) {
this.allocationExplanation = allocationExplanation;
return this;
}
public Builder version(long version) {
this.version = version;
return this;
@ -188,11 +214,12 @@ public class ClusterState {
this.routingTable = state.routingTable();
this.metaData = state.metaData();
this.blocks = state.blocks();
this.allocationExplanation = state.allocationExplanation();
return this;
}
public ClusterState build() {
return new ClusterState(version, metaData, routingTable, nodes, blocks);
return new ClusterState(version, metaData, routingTable, nodes, blocks, allocationExplanation);
}
public static byte[] toBytes(ClusterState state) throws IOException {
@ -211,6 +238,7 @@ public class ClusterState {
RoutingTable.Builder.writeTo(state.routingTable(), out);
DiscoveryNodes.Builder.writeTo(state.nodes(), out);
ClusterBlocks.Builder.writeClusterBlocks(state.blocks(), out);
state.allocationExplanation().writeTo(out);
}
public static ClusterState readFrom(StreamInput in, @Nullable Settings globalSettings, @Nullable DiscoveryNode localNode) throws IOException {
@ -220,6 +248,7 @@ public class ClusterState {
builder.routingTable = RoutingTable.Builder.readFrom(in);
builder.nodes = DiscoveryNodes.Builder.readFrom(in, localNode);
builder.blocks = ClusterBlocks.Builder.readClusterBlocks(in);
builder.allocationExplanation = AllocationExplanation.readAllocationExplanation(in);
return builder.build();
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -124,12 +125,11 @@ public class ShardStateAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingTable prevRoutingTable = currentState.routingTable();
RoutingTable newRoutingTable = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting));
if (prevRoutingTable == newRoutingTable) {
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting));
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
}
@ -163,11 +163,11 @@ public class ShardStateAction extends AbstractComponent {
if (logger.isDebugEnabled()) {
logger.debug("applying started shard {}, reason [{}]", shardRouting, reason);
}
RoutingTable newRoutingTable = shardsAllocation.applyStartedShards(currentState, newArrayList(shardRouting));
if (routingTable == newRoutingTable) {
RoutingAllocation.Result routingResult = shardsAllocation.applyStartedShards(currentState, newArrayList(shardRouting));
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Maps;
@ -295,8 +296,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
.initializeEmpty(currentState.metaData().index(request.index));
routingTableBuilder.add(indexRoutingBuilder);
RoutingTable newRoutingTable = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
RoutingAllocation.Result routingResult = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -89,7 +90,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
.remove(request.index)
.build();
RoutingTable newRoutingTable = shardsAllocation.reroute(
RoutingAllocation.Result routingResult = shardsAllocation.reroute(
newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
@ -117,7 +118,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
listener.timeout = timeoutTask;
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).blocks(blocks).build();
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
} catch (Exception e) {
listener.onFailure(e);
return currentState;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -120,12 +121,12 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable newRoutingTable = shardsAllocation.reroute(currentState);
if (newRoutingTable == currentState.routingTable()) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
routingTableDirty = false;

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class AllocationExplanation implements Streamable {
public static final AllocationExplanation EMPTY = new AllocationExplanation();
public static class NodeExplanation {
private final DiscoveryNode node;
private final String description;
public NodeExplanation(DiscoveryNode node, String description) {
this.node = node;
this.description = description;
}
public DiscoveryNode node() {
return node;
}
public String description() {
return description;
}
}
private final Map<ShardId, List<NodeExplanation>> explanations = Maps.newHashMap();
public AllocationExplanation add(ShardId shardId, NodeExplanation nodeExplanation) {
List<NodeExplanation> list = explanations.get(shardId);
if (list == null) {
list = Lists.newArrayList();
explanations.put(shardId, list);
}
list.add(nodeExplanation);
return this;
}
public Map<ShardId, List<NodeExplanation>> explanations() {
return this.explanations;
}
public static AllocationExplanation readAllocationExplanation(StreamInput in) throws IOException {
AllocationExplanation e = new AllocationExplanation();
e.readFrom(in);
return e;
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
ShardId shardId = ShardId.readShardId(in);
int size2 = in.readVInt();
List<NodeExplanation> ne = Lists.newArrayListWithCapacity(size2);
for (int j = 0; j < size2; j++) {
ne.add(new NodeExplanation(DiscoveryNode.readNode(in), in.readUTF()));
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(explanations.size());
for (Map.Entry<ShardId, List<NodeExplanation>> entry : explanations.entrySet()) {
entry.getKey().writeTo(out);
out.writeVInt(entry.getValue().size());
for (NodeExplanation nodeExplanation : entry.getValue()) {
nodeExplanation.node().writeTo(out);
out.writeUTF(nodeExplanation.description());
}
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class FailedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> failedShards;
public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
super(routingNodes, nodes);
this.failedShards = failedShards;
}
public List<? extends ShardRouting> failedShards() {
return failedShards;
}
}

View File

@ -19,15 +19,11 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
/**
* A pluggable logic allowing to control if allocation of a shard is allowed on a specific node.
*
@ -56,22 +52,22 @@ public abstract class NodeAllocation extends AbstractComponent {
super(settings);
}
public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
}
public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
}
public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
return false;
}
public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return true;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}
}

View File

@ -19,15 +19,12 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import java.util.Set;
/**
@ -54,39 +51,39 @@ public class NodeAllocations extends NodeAllocation {
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (NodeAllocation allocation : allocations) {
allocation.applyStartedShards(nodeAllocations, routingNodes, nodes, startedShards);
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
allocation1.applyStartedShards(nodeAllocations, allocation);
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (NodeAllocation allocation : allocations) {
allocation.applyFailedShards(nodeAllocations, routingNodes, nodes, failedShards);
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
allocation1.applyFailedShards(nodeAllocations, allocation);
}
}
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
for (NodeAllocation allocation : allocations) {
if (!allocation.canRebalance(shardRouting, routingNodes, nodes)) {
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
for (NodeAllocation allocation1 : allocations) {
if (!allocation1.canRebalance(shardRouting, allocation)) {
return false;
}
}
return true;
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
for (NodeAllocation allocation : allocations) {
changed |= allocation.allocateUnassigned(nodeAllocations, routingNodes, nodes);
for (NodeAllocation allocation1 : allocations) {
changed |= allocation1.allocateUnassigned(nodeAllocations, allocation);
}
return changed;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
for (NodeAllocation allocation : allocations) {
Decision decision = allocation.canAllocate(shardRouting, node, routingNodes);
for (NodeAllocation allocation1 : allocations) {
Decision decision = allocation1.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) {
return Decision.NO;
} else if (decision == Decision.THROTTLE) {

View File

@ -19,9 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -39,8 +37,8 @@ public class RebalanceOnlyWhenActiveNodeAllocation extends NodeAllocation {
super(settings);
}
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingNodes routingNodes, DiscoveryNodes nodes) {
List<MutableShardRouting> shards = routingNodes.shardsRoutingFor(shardRouting);
@Override public boolean canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
List<MutableShardRouting> shards = allocation.routingNodes().shardsRoutingFor(shardRouting);
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
for (ShardRouting allShard : shards) {

View File

@ -19,10 +19,8 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -38,15 +36,11 @@ public class ReplicaAfterPrimaryActiveNodeAllocation extends NodeAllocation {
super(settings);
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
return Decision.YES;
}
MutableShardRouting primary = routingNodes.findPrimaryForReplica(shardRouting);
MutableShardRouting primary = allocation.routingNodes().findPrimaryForReplica(shardRouting);
if (primary == null || !primary.active()) {
return Decision.NO;
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
/**
* @author kimchy (shay.banon)
*/
public class RoutingAllocation {
public static class Result {
private final boolean changed;
private final RoutingTable routingTable;
private final AllocationExplanation explanation;
public Result(boolean changed, RoutingTable routingTable, AllocationExplanation explanation) {
this.changed = changed;
this.routingTable = routingTable;
this.explanation = explanation;
}
public boolean changed() {
return this.changed;
}
public RoutingTable routingTable() {
return routingTable;
}
public AllocationExplanation explanation() {
return explanation;
}
}
private final RoutingNodes routingNodes;
private final DiscoveryNodes nodes;
private final AllocationExplanation explanation = new AllocationExplanation();
public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.routingNodes = routingNodes;
this.nodes = nodes;
}
public RoutingTable routingTable() {
return routingNodes.routingTable();
}
public RoutingNodes routingNodes() {
return routingNodes;
}
public DiscoveryNodes nodes() {
return nodes;
}
public AllocationExplanation explanation() {
return explanation;
}
}

View File

@ -19,10 +19,8 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -38,11 +36,7 @@ public class SameShardNodeAllocation extends NodeAllocation {
super(settings);
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
for (MutableShardRouting current : node.shards()) {
// we do not allow for two shards of the same shard id to exists on the same node
if (current.shardId().equals(shardRouting.shardId())) {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -60,15 +59,16 @@ public class ShardsAllocation extends AbstractComponent {
*
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingTable applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards);
StartedRerouteAllocation allocation = new StartedRerouteAllocation(routingNodes, clusterState.nodes(), startedShards);
nodeAllocations.applyStartedShards(nodeAllocations, allocation);
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return clusterState.routingTable();
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
reroute(routingNodes, clusterState.nodes());
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
/**
@ -76,16 +76,17 @@ public class ShardsAllocation extends AbstractComponent {
*
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingTable applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards);
boolean changed = applyFailedShards(routingNodes, failedShards);
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShards);
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
boolean changed = applyFailedShards(allocation);
if (!changed) {
return clusterState.routingTable();
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
// If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards
// reroute(routingNodes, clusterState.nodes());
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
/**
@ -93,45 +94,46 @@ public class ShardsAllocation extends AbstractComponent {
*
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingTable reroute(ClusterState clusterState) {
public RoutingAllocation.Result reroute(ClusterState clusterState) {
RoutingNodes routingNodes = clusterState.routingNodes();
if (!reroute(routingNodes, clusterState.nodes())) {
return clusterState.routingTable();
RoutingAllocation allocation = new RoutingAllocation(routingNodes, clusterState.nodes());
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
return new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
private boolean reroute(RoutingNodes routingNodes, DiscoveryNodes nodes) {
Iterable<DiscoveryNode> dataNodes = nodes.dataNodes().values();
private boolean reroute(RoutingAllocation allocation) {
Iterable<DiscoveryNode> dataNodes = allocation.nodes().dataNodes().values();
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(routingNodes, dataNodes);
changed |= deassociateDeadNodes(allocation.routingNodes(), dataNodes);
// create a sorted list of from nodes with least number of shards to the maximum ones
applyNewNodes(routingNodes, dataNodes);
applyNewNodes(allocation.routingNodes(), dataNodes);
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimaries(routingNodes);
changed |= electPrimaries(allocation.routingNodes());
// now allocate all the unassigned to available nodes
if (routingNodes.hasUnassigned()) {
changed |= nodeAllocations.allocateUnassigned(nodeAllocations, routingNodes, nodes);
changed |= allocateUnassigned(routingNodes);
if (allocation.routingNodes().hasUnassigned()) {
changed |= nodeAllocations.allocateUnassigned(nodeAllocations, allocation);
changed |= allocateUnassigned(allocation);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimaries(routingNodes);
changed |= electPrimaries(allocation.routingNodes());
}
// rebalance
changed |= rebalance(routingNodes, nodes);
changed |= rebalance(allocation);
return changed;
}
private boolean rebalance(RoutingNodes routingNodes, DiscoveryNodes nodes) {
private boolean rebalance(RoutingAllocation allocation) {
boolean changed = false;
List<RoutingNode> sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh();
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
if (sortedNodesLeastToHigh.isEmpty()) {
return false;
}
@ -143,7 +145,7 @@ public class ShardsAllocation extends AbstractComponent {
while (lowIndex != highIndex) {
RoutingNode lowRoutingNode = sortedNodesLeastToHigh.get(lowIndex);
RoutingNode highRoutingNode = sortedNodesLeastToHigh.get(highIndex);
int averageNumOfShards = routingNodes.requiredAverageNumberOfShardsPerNode();
int averageNumOfShards = allocation.routingNodes().requiredAverageNumberOfShardsPerNode();
// only active shards can be removed so must count only active ones.
if (highRoutingNode.numberOfOwningShards() <= averageNumOfShards) {
@ -159,11 +161,11 @@ public class ShardsAllocation extends AbstractComponent {
boolean relocated = false;
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting startedShard : startedShards) {
if (!nodeAllocations.canRebalance(startedShard, routingNodes, nodes)) {
if (!nodeAllocations.canRebalance(startedShard, allocation)) {
continue;
}
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, routingNodes).allocate()) {
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, allocation).allocate()) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
@ -214,8 +216,11 @@ public class ShardsAllocation extends AbstractComponent {
return changed;
}
private boolean allocateUnassigned(RoutingNodes routingNodes) {
private boolean allocateUnassigned(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
List<RoutingNode> nodes = routingNodes.sortedNodesLeastToHigh();
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
@ -231,7 +236,7 @@ public class ShardsAllocation extends AbstractComponent {
lastNode = 0;
}
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) {
continue;
@ -250,7 +255,7 @@ public class ShardsAllocation extends AbstractComponent {
MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
if (nodeAllocations.canAllocate(shard, routingNode, routingNodes).allocate()) {
if (nodeAllocations.canAllocate(shard, routingNode, allocation).allocate()) {
changed = true;
routingNode.add(shard);
it.remove();
@ -379,15 +384,15 @@ public class ShardsAllocation extends AbstractComponent {
return dirty;
}
private boolean applyFailedShards(RoutingNodes routingNodes, Iterable<? extends ShardRouting> failedShardEntries) {
private boolean applyFailedShards(FailedRerouteAllocation allocation) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
for (ShardRouting failedShard : failedShardEntries) {
for (ShardRouting failedShard : allocation.failedShards()) {
boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = routingNodes.nodesToShards().get(failedShard.currentNodeId());
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
@ -403,7 +408,7 @@ public class ShardsAllocation extends AbstractComponent {
}
String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(nodeId);
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
@ -439,10 +444,10 @@ public class ShardsAllocation extends AbstractComponent {
// not in relocation so find a new target.
boolean allocated = false;
List<RoutingNode> sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh();
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
for (RoutingNode target : sortedNodesLeastToHigh) {
if (!target.nodeId().equals(failedShard.currentNodeId()) &&
nodeAllocations.canAllocate(failedShard, target, routingNodes).allocate()) {
nodeAllocations.canAllocate(failedShard, target, allocation).allocate()) {
target.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
target.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
@ -452,7 +457,7 @@ public class ShardsAllocation extends AbstractComponent {
}
if (!allocated) {
// we did not manage to allocate it, put it in the unassigned
routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED));
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class StartedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> startedShards;
public StartedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
super(routingNodes, nodes);
this.startedShards = startedShards;
}
public List<? extends ShardRouting> startedShards() {
return startedShards;
}
}

View File

@ -19,8 +19,10 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -37,14 +39,10 @@ public class ThrottlingNodeAllocation extends NodeAllocation {
this.concurrentRecoveries = componentSettings.getAsInt("concurrent_recoveries", Runtime.getRuntime().availableProcessors() + 1);
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
return false;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
boolean primaryUnassigned = false;
for (MutableShardRouting shard : routingNodes.unassigned()) {
for (MutableShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.shardId().equals(shardRouting.shardId())) {
primaryUnassigned = true;
}

View File

@ -26,8 +26,7 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,7 +44,6 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -74,23 +72,26 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (ShardRouting shardRouting : startedShards) {
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.startedShards()) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (ShardRouting shardRouting : failedShards) {
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.failedShards()) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();
if (nodes.dataNodes().isEmpty()) {
return changed;
}
@ -119,7 +120,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
continue;
}
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
@ -153,7 +154,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) {
if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) {
continue;
}
@ -249,7 +250,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
}
if (lastNodeMatched != null) {
if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) {
if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) {
if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
@ -333,8 +334,4 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
}
return shardStores;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
return Decision.YES;
}
}

View File

@ -25,8 +25,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.Tuple;
@ -47,7 +46,6 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -82,26 +80,26 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
this.initialShards = componentSettings.get("initial_shards", "quorum");
}
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (ShardRouting shardRouting : startedShards) {
@Override public void applyStartedShards(NodeAllocations nodeAllocations, StartedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.startedShards()) {
cachedStores.remove(shardRouting.shardId());
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (ShardRouting shardRouting : failedShards) {
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.failedShards()) {
cachedStores.remove(shardRouting.shardId());
}
for (ShardRouting failedShard : failedShards) {
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(failedShard.index());
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
for (ShardRouting failedShard : allocation.failedShards()) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index());
if (!allocation.routingNodes().blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
continue;
}
// we are still in the initial allocation, find another node with existing shards
// all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(nodes.dataNodes().keySet());
nodesIds.addAll(allocation.nodes().dataNodes().keySet());
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet();
// make a list of ShardId to Node, each one from the latest version
@ -125,7 +123,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
if (t != null) {
// we found a node to allocate to, do it
RoutingNode currentRoutingNode = routingNodes.nodesToShards().get(failedShard.currentNodeId());
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
continue;
@ -142,7 +140,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
}
RoutingNode targetNode = routingNodes.nodesToShards().get(t.v1().id());
RoutingNode targetNode = allocation.routingNodes().nodesToShards().get(t.v1().id());
targetNode.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
targetNode.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
@ -150,8 +148,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
boolean changed = false;
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();
for (IndexRoutingTable indexRoutingTable : routingNodes.routingTable()) {
// only do the allocation if there is a local "INDEX NOT RECOVERED" block
@ -266,7 +266,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
continue;
}
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
if (nodeAllocations.canAllocate(shard, node, allocation).allocate()) {
canBeAllocatedToAtLeastOneNode = true;
break;
}
@ -300,7 +300,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
// check if we can allocate on that node...
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) {
if (nodeAllocations.canAllocate(shard, node, allocation) == Decision.NO) {
continue;
}
@ -335,7 +335,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
if (lastNodeMatched != null) {
if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) {
if (nodeAllocations.canAllocate(shard, lastNodeMatched, allocation) == NodeAllocation.Decision.THROTTLE) {
if (logger.isTraceEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}

View File

@ -65,19 +65,19 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shards");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the replica shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -89,7 +89,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
logger.info("Start another node and perform rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("find the replica shard that gets relocated");
@ -105,7 +105,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
logger.info("kill the node [{}] of the primary shard for the relocating replica", indexShardRoutingTable.primaryShard().currentNodeId());
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove(indexShardRoutingTable.primaryShard().currentNodeId())).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("make sure all the primary shards are active");

View File

@ -68,13 +68,13 @@ public class FailedShardsRoutingTests {
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the shards (primaries)");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -92,7 +92,7 @@ public class FailedShardsRoutingTests {
logger.info("Start the shards (backups)");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -110,7 +110,7 @@ public class FailedShardsRoutingTests {
logger.info("Adding third node and reroute");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -125,7 +125,7 @@ public class FailedShardsRoutingTests {
logger.info("Fail the shards on node 3");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -137,7 +137,7 @@ public class FailedShardsRoutingTests {
logger.info("Do another reroute, should try and assign again to node 3");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -152,7 +152,7 @@ public class FailedShardsRoutingTests {
logger.info("Start the shards on node 3");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -192,7 +192,7 @@ public class FailedShardsRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -210,7 +210,7 @@ public class FailedShardsRoutingTests {
logger.info("Add another node and perform rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// nothing will change, since primary shards have not started yet
@ -229,7 +229,7 @@ public class FailedShardsRoutingTests {
logger.info("Start the primary shards");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -246,14 +246,14 @@ public class FailedShardsRoutingTests {
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Fail backup shards on node2");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
List<MutableShardRouting> failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING);
routingTable = strategy.applyFailedShards(clusterState, failedShards);
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -270,7 +270,7 @@ public class FailedShardsRoutingTests {
}
// fail them again...
routingTable = strategy.applyFailedShards(clusterState, failedShards);
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

View File

@ -64,25 +64,25 @@ public class PrimaryElectionRoutingTests {
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the backup shard (on node2)");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Adding third node and reroute and kill first node");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3")).remove("node1")).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

View File

@ -64,19 +64,19 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
logger.info("start another node, replica will start recovering form primary");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
@ -84,7 +84,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
logger.info("start another node, make sure the primary is not relocated");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));

View File

@ -74,7 +74,7 @@ public class RebalanceAfterActiveTests {
logger.info("start two nodes and fully start the shards");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
@ -86,7 +86,7 @@ public class RebalanceAfterActiveTests {
logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -101,7 +101,7 @@ public class RebalanceAfterActiveTests {
.put(newNode("node3")).put(newNode("node4")).put(newNode("node5")).put(newNode("node6")).put(newNode("node7")).put(newNode("node8")).put(newNode("node9")).put(newNode("node10")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -114,7 +114,7 @@ public class RebalanceAfterActiveTests {
logger.info("start the replica shards, rebalancing should start");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -125,7 +125,7 @@ public class RebalanceAfterActiveTests {
logger.info("complete relocation, other half of relocation should happen");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -136,7 +136,7 @@ public class RebalanceAfterActiveTests {
logger.info("complete relocation, thats it!");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

View File

@ -73,7 +73,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -89,7 +89,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
logger.info("Start all the primary shards");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));

View File

@ -78,7 +78,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Adding one node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(1));
@ -90,14 +90,14 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Rerouting again, nothing should change");
prevRoutingTable = routingTable;
clusterState = newClusterStateBuilder().state(clusterState).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(routingTable == prevRoutingTable, equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Marking the shard as started");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable != prevRoutingTable, equalTo(true));
@ -110,7 +110,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Starting another node and making sure nothing changed");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable == prevRoutingTable, equalTo(true));
@ -124,7 +124,7 @@ public class SingleShardNoReplicasRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove("node1")).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable != prevRoutingTable, equalTo(true));
@ -137,14 +137,14 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Start another node, make sure that things remain the same (shard is in node2 and initializing)");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable == prevRoutingTable, equalTo(true));
logger.info("Start the shard on node 2");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable != prevRoutingTable, equalTo(true));
@ -179,7 +179,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Adding one node and rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -193,7 +193,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Marking the shard as failed");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -240,7 +240,7 @@ public class SingleShardNoReplicasRoutingTests {
}
RoutingTable prevRoutingTable = routingTable;
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodesBuilder).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -278,14 +278,14 @@ public class SingleShardNoReplicasRoutingTests {
}
prevRoutingTable = routingTable;
clusterState = newClusterStateBuilder().state(clusterState).nodes(nodesBuilder).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(false));
logger.info("Marking the shard as started");
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -339,7 +339,7 @@ public class SingleShardNoReplicasRoutingTests {
.nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")))
.build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -362,13 +362,13 @@ public class SingleShardNoReplicasRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -384,7 +384,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Now, mark the relocated as started");
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// routingTable = strategy.reroute(new RoutingStrategyInfo(metaData, routingTable), nodes);

View File

@ -73,7 +73,7 @@ public class SingleShardOneReplicaRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -89,7 +89,7 @@ public class SingleShardOneReplicaRoutingTests {
logger.info("Add another node and perform rerouting, nothing will happen since primary shards not started");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
@ -97,7 +97,7 @@ public class SingleShardOneReplicaRoutingTests {
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -114,13 +114,13 @@ public class SingleShardOneReplicaRoutingTests {
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -137,7 +137,7 @@ public class SingleShardOneReplicaRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).remove("node1")).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -155,7 +155,7 @@ public class SingleShardOneReplicaRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));

View File

@ -75,7 +75,7 @@ public class TenShardsOneReplicaRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -93,7 +93,7 @@ public class TenShardsOneReplicaRoutingTests {
logger.info("Add another node and perform rerouting, nothing will happen since primary not started");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable == routingTable, equalTo(true));
@ -101,7 +101,7 @@ public class TenShardsOneReplicaRoutingTests {
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -119,13 +119,13 @@ public class TenShardsOneReplicaRoutingTests {
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -146,7 +146,7 @@ public class TenShardsOneReplicaRoutingTests {
logger.info("Add another node and perform rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -161,7 +161,7 @@ public class TenShardsOneReplicaRoutingTests {
logger.info("Start the shards on node 3");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

View File

@ -62,7 +62,7 @@ public class ThrottlingAllocationTests {
logger.info("start one node, do reroute, only 3 should initialize");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
@ -70,7 +70,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(17));
logger.info("start initializing, another 3 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3));
@ -78,7 +78,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(14));
logger.info("start initializing, another 3 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(6));
@ -86,7 +86,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(11));
logger.info("start initializing, another 1 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(9));
@ -94,7 +94,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(10));
logger.info("start initializing, all primaries should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
@ -119,7 +119,7 @@ public class ThrottlingAllocationTests {
logger.info("start one node, do reroute, only 3 should initialize");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
@ -127,7 +127,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(7));
logger.info("start initializing, another 2 should initialize");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(3));
@ -135,7 +135,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(5));
logger.info("start initializing, all primaries should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
@ -144,7 +144,7 @@ public class ThrottlingAllocationTests {
logger.info("start another node, replicas should start being allocated");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
@ -152,7 +152,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2));
logger.info("start initializing replicas");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(8));
@ -160,7 +160,7 @@ public class ThrottlingAllocationTests {
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
logger.info("start initializing replicas, all should be started");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));

View File

@ -56,19 +56,19 @@ public class UpdateNumberOfReplicasTests {
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the primary shards");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the replica shards");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -103,7 +103,7 @@ public class UpdateNumberOfReplicasTests {
logger.info("Add another node and start the added replica");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node3"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -119,7 +119,7 @@ public class UpdateNumberOfReplicasTests {
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
@ -153,7 +153,7 @@ public class UpdateNumberOfReplicasTests {
logger.info("do a reroute, should remain the same");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(false));

View File

@ -57,7 +57,7 @@ public class ClusterSerializationTests {
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
ShardsAllocation strategy = new ShardsAllocation();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState)).build();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build();
ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), ImmutableSettings.settingsBuilder().build(), newNode("node1"));
@ -79,7 +79,7 @@ public class ClusterSerializationTests {
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
ShardsAllocation strategy = new ShardsAllocation();
RoutingTable source = strategy.reroute(clusterState);
RoutingTable source = strategy.reroute(clusterState).routingTable();
BytesStreamOutput outStream = new BytesStreamOutput();
RoutingTable.Builder.writeTo(source, outStream);