Add `explain` flag support to the reroute API

By specifying the `explain` flag, an explanation for the reason a
command can or cannot be executed is returned. No allocation commands
are actually performed.

Returns a response similar to:

{
  "state": {...cluster state...},
  "acknowledged": true,
  "explanations" : [ {
    "command" : "cancel",
      "parameters" : {
        "index" : "decide",
        "shard" : 0,
        "node" : "IvpoKRdtRiGrQ_WKtt4_4w",
        "allow_primary" : false
      },
      "decisions" : [ {
        "decider" : "cancel_allocation_command",
        "decision" : "YES",
        "explanation" : "..."
        } ]
     }, {
      "command" : "move",
      "parameters" : {
        "index" : "decide",
        "shard" : 0,
        "from_node" : "IvpoKRdtRiGrQ_WKtt4_4w",
        "to_node" : "IvpoKRdtRiGrQ_WKtt4_4w"
       },
       "decisions" : [ {
         "decider" : "same_shard",
         "decision" : "NO",
         "explanation" : "shard cannot be allocated on same node [IvpoKRdtRiGrQ_WKtt4_4w] it already exists on"
       },
       etc
       ]
  }]
}

also removes AllocationExplanation from cluster state

Closes #2483
Closes #5169
This commit is contained in:
Lee Hinman 2014-01-31 16:50:32 -07:00
parent 8ceb98752d
commit e53a43800e
36 changed files with 744 additions and 201 deletions

View File

@ -11,12 +11,11 @@ Here is a short example of how a simple reroute API call:
[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
"commands" : [ {
"move" :
"move" :
{
"index" : "test", "shard" : 0,
"index" : "test", "shard" : 0,
"from_node" : "node1", "to_node" : "node2"
}
},
@ -45,15 +44,18 @@ the request body). This will cause the commands to apply to the current
cluster state, and return the resulting cluster after the commands (and
re-balancing) has been applied.
The commands supported are:
If the `explain` parameter is specified, a detailed explanation of why the
commands could or could not be executed is returned.
`move`::
The commands supported are:
`move`::
Move a started shard from one node to another node. Accepts
`index` and `shard` for index name and shard number, `from_node` for the
node to move the shard `from`, and `to_node` for the node to move the
shard to.
shard to.
`cancel`::
`cancel`::
Cancel allocation of a shard (or recovery). Accepts `index`
and `shard` for index name and shard number, and `node` for the node to
cancel the shard allocation on. It also accepts `allow_primary` flag to
@ -62,7 +64,7 @@ The commands supported are:
from the primary shard by cancelling them and allowing them to be
reinitialized through the standard reallocation process.
`allocate`::
`allocate`::
Allocate an unassigned shard to a node. Accepts the
`index` and `shard` for index name and shard number, and `node` to
allocate the shard to. It also accepts `allow_primary` flag to

View File

@ -12,6 +12,10 @@
"type" : "boolean",
"description" : "Simulate the operation only and return the resulting state"
},
"explain": {
"type" : "boolean",
"description" : "Return an explanation of why the commands can or cannot be executed"
},
"filter_metadata": {
"type" : "boolean",
"description" : "Don't return cluster state metadata (default: false)"

View File

@ -0,0 +1,49 @@
setup:
- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: green
---
"Explain API with empty command list":
- do:
cluster.reroute:
explain: true
dry_run: true
body:
commands: []
- match: {explanations: []}
---
"Explain API for non-existant node & shard":
- do:
cluster.reroute:
explain: true
dry_run: true
body:
commands:
- cancel:
index: test_index
shard: 9
node: node_0
- match: {explanations.0.command: cancel}
- match:
explanations.0.parameters:
index: test_index
shard: 9
node: node_0
allow_primary: false
- match: {explanations.0.decisions.0.decider: cancel_allocation_command}
- match: {explanations.0.decisions.0.decision: "NO"}
- is_true: explanations.0.decisions.0.explanation

View File

@ -20,7 +20,6 @@ setup:
- is_false: metadata
- is_false: routing_table
- is_false: routing_nodes
- is_false: allocations
- length: { blocks: 0 }
---
@ -41,7 +40,6 @@ setup:
- is_false: metadata
- is_false: routing_table
- is_false: routing_nodes
- is_false: allocations
- length: { blocks: 1 }
---
@ -55,7 +53,6 @@ setup:
- is_false: metadata
- is_false: routing_table
- is_false: routing_nodes
- is_false: allocations
---
"Filtering the cluster state by metadata only should work":
@ -68,7 +65,6 @@ setup:
- is_true: metadata
- is_false: routing_table
- is_false: routing_nodes
- is_false: allocations
---
@ -82,7 +78,6 @@ setup:
- is_false: metadata
- is_true: routing_table
- is_true: routing_nodes
- is_true: allocations
---
@ -120,7 +115,6 @@ setup:
- is_true: metadata
- is_false: routing_table
- is_false: routing_nodes
- is_false: allocations
- is_true: metadata.templates.test1
- is_true: metadata.templates.test2
- is_false: metadata.templates.foo
@ -160,5 +154,3 @@ setup:
- is_true: metadata
- is_true: routing_table
- is_true: routing_nodes
- is_true: allocations

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
@ -39,6 +40,7 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
AllocationCommands commands = new AllocationCommands();
boolean dryRun;
boolean explain;
public ClusterRerouteRequest() {
}
@ -69,6 +71,23 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
return this.dryRun;
}
/**
* Sets the explain flag, which will collect information about the reroute
* request without executing the actions. Similar to dryRun,
* but human-readable.
*/
public ClusterRerouteRequest explain(boolean explain) {
this.explain = explain;
return this;
}
/**
* Returns the current explain flag
*/
public boolean explain() {
return this.explain;
}
/**
* Sets the source for the request.
*/
@ -110,6 +129,11 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
super.readFrom(in);
commands = AllocationCommands.readFrom(in);
dryRun = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
explain = in.readBoolean();
} else {
explain = false;
}
readTimeout(in);
}
@ -118,6 +142,9 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
super.writeTo(out);
AllocationCommands.writeTo(commands, out);
out.writeBoolean(dryRun);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeBoolean(explain);
}
writeTimeout(out);
}
}

View File

@ -53,6 +53,15 @@ public class ClusterRerouteRequestBuilder extends AcknowledgedRequestBuilder<Clu
return this;
}
/**
* Sets the explain flag (defaults to <tt>false</tt>). If true, the
* request will include an explanation in addition to the cluster state.
*/
public ClusterRerouteRequestBuilder setExplain(boolean explain) {
request.explain(explain);
return this;
}
/**
* Sets the source for the request
*/

View File

@ -19,8 +19,11 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,14 +35,16 @@ import java.io.IOException;
public class ClusterRerouteResponse extends AcknowledgedResponse {
private ClusterState state;
private RoutingExplanations explanations;
ClusterRerouteResponse() {
}
ClusterRerouteResponse(boolean acknowledged, ClusterState state) {
ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
super(acknowledged);
this.state = state;
this.explanations = explanations;
}
/**
@ -49,11 +54,20 @@ public class ClusterRerouteResponse extends AcknowledgedResponse {
return this.state;
}
public RoutingExplanations getExplanations() {
return this.explanations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
state = ClusterState.Builder.readFrom(in, null);
readAcknowledged(in);
if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
explanations = RoutingExplanations.readFrom(in);
} else {
explanations = new RoutingExplanations();
}
}
@Override
@ -61,5 +75,8 @@ public class ClusterRerouteResponse extends AcknowledgedResponse {
super.writeTo(out);
ClusterState.Builder.writeTo(state, out);
writeAcknowledged(out);
if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
RoutingExplanations.writeTo(explanations, out);
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
@ -75,6 +76,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new AckedClusterStateUpdateTask() {
private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
@ -83,12 +85,12 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend));
listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend, explanations));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend));
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
}
@Override
@ -109,9 +111,10 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, true);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, request.explain());
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
explanations = routingResult.explanations();
if (request.dryRun) {
return currentState;
}

View File

@ -90,7 +90,6 @@ public class TransportClusterStateAction extends TransportMasterNodeReadOperatio
} else {
builder.routingTable(currentState.routingTable());
}
builder.allocationExplanation(currentState.allocationExplanation());
}
if (request.blocks()) {
builder.blocks(currentState.blocks());

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -48,10 +49,12 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.*;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
/**
*
@ -105,8 +108,6 @@ public class ClusterState implements ToXContent {
private final ClusterBlocks blocks;
private final AllocationExplanation allocationExplanation;
private final ImmutableOpenMap<String, Custom> customs;
// built on demand
@ -115,16 +116,15 @@ public class ClusterState implements ToXContent {
private SettingsFilter settingsFilter;
public ClusterState(long version, ClusterState state) {
this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.allocationExplanation(), state.customs());
this(version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
}
public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, AllocationExplanation allocationExplanation, ImmutableOpenMap<String, Custom> customs) {
public ClusterState(long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs) {
this.version = version;
this.metaData = metaData;
this.routingTable = routingTable;
this.nodes = nodes;
this.blocks = blocks;
this.allocationExplanation = allocationExplanation;
this.customs = customs;
}
@ -176,14 +176,6 @@ public class ClusterState implements ToXContent {
return blocks;
}
public AllocationExplanation allocationExplanation() {
return this.allocationExplanation;
}
public AllocationExplanation getAllocationExplanation() {
return allocationExplanation();
}
public ImmutableOpenMap<String, Custom> customs() {
return this.customs;
}
@ -417,28 +409,6 @@ public class ClusterState implements ToXContent {
builder.endObject();
}
if (isAllMetricsOnly || metrics.contains("routing_table")) {
builder.startArray("allocations");
for (Map.Entry<ShardId, List<AllocationExplanation.NodeExplanation>> entry : allocationExplanation().explanations().entrySet()) {
builder.startObject();
builder.field("index", entry.getKey().index().name());
builder.field("shard", entry.getKey().id());
builder.startArray("explanations");
for (AllocationExplanation.NodeExplanation nodeExplanation : entry.getValue()) {
builder.field("desc", nodeExplanation.description());
if (nodeExplanation.node() != null) {
builder.startObject("node");
builder.field("id", nodeExplanation.node().id());
builder.field("name", nodeExplanation.node().name());
builder.endObject();
}
}
builder.endArray();
builder.endObject();
}
builder.endArray();
}
if (isAllMetricsOnly || metrics.contains("customs")) {
for (ObjectObjectCursor<String, Custom> cursor : customs) {
builder.startObject(cursor.key);
@ -465,7 +435,6 @@ public class ClusterState implements ToXContent {
private RoutingTable routingTable = RoutingTable.EMPTY_ROUTING_TABLE;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
private AllocationExplanation allocationExplanation = AllocationExplanation.EMPTY;
private final ImmutableOpenMap.Builder<String, Custom> customs;
public Builder() {
@ -478,7 +447,6 @@ public class ClusterState implements ToXContent {
this.routingTable = state.routingTable();
this.metaData = state.metaData();
this.blocks = state.blocks();
this.allocationExplanation = state.allocationExplanation();
this.customs = ImmutableOpenMap.builder(state.customs());
}
@ -497,7 +465,6 @@ public class ClusterState implements ToXContent {
public Builder routingResult(RoutingAllocation.Result routingResult) {
this.routingTable = routingResult.routingTable();
this.allocationExplanation = routingResult.explanation();
return this;
}
@ -524,11 +491,6 @@ public class ClusterState implements ToXContent {
return this;
}
public Builder allocationExplanation(AllocationExplanation allocationExplanation) {
this.allocationExplanation = allocationExplanation;
return this;
}
public Builder version(long version) {
this.version = version;
return this;
@ -549,7 +511,7 @@ public class ClusterState implements ToXContent {
}
public ClusterState build() {
return new ClusterState(version, metaData, routingTable, nodes, blocks, allocationExplanation, customs.build());
return new ClusterState(version, metaData, routingTable, nodes, blocks, customs.build());
}
public static byte[] toBytes(ClusterState state) throws IOException {
@ -568,7 +530,10 @@ public class ClusterState implements ToXContent {
RoutingTable.Builder.writeTo(state.routingTable(), out);
DiscoveryNodes.Builder.writeTo(state.nodes(), out);
ClusterBlocks.Builder.writeClusterBlocks(state.blocks(), out);
state.allocationExplanation().writeTo(out);
if (out.getVersion().before(Version.V_1_1_0)) {
// Versions before 1.1.0 are expecting AllocationExplanation
AllocationExplanation.EMPTY.writeTo(out);
}
out.writeVInt(state.customs().size());
for (ObjectObjectCursor<String, Custom> cursor : state.customs()) {
out.writeString(cursor.key);
@ -583,7 +548,10 @@ public class ClusterState implements ToXContent {
builder.routingTable = RoutingTable.Builder.readFrom(in);
builder.nodes = DiscoveryNodes.Builder.readFrom(in, localNode);
builder.blocks = ClusterBlocks.Builder.readClusterBlocks(in);
builder.allocationExplanation = AllocationExplanation.readAllocationExplanation(in);
if (in.getVersion().before(Version.V_1_1_0)) {
// Ignore the explanation read, since after 1.1.0 it's not part of the cluster state
AllocationExplanation.readAllocationExplanation(in);
}
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();

View File

@ -79,13 +79,13 @@ public class AllocationService extends AbstractComponent {
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), startedShards, clusterInfoService.getClusterInfo());
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
return new RoutingAllocation.Result(false, clusterState.routingTable());
}
shardsAllocators.applyStartedShards(allocation);
if (withReroute) {
reroute(allocation);
}
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()));
}
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
@ -107,33 +107,34 @@ public class AllocationService extends AbstractComponent {
changed |= applyFailedShard(allocation, failedShard, true);
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
return new RoutingAllocation.Result(false, clusterState.routingTable());
}
shardsAllocators.applyFailedShards(allocation);
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()));
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
return reroute(clusterState, commands, false);
}
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean debug) throws ElasticsearchException {
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain) throws ElasticsearchException {
RoutingNodes routingNodes = clusterState.routingNodes();
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
// don't short circuit deciders, we want a full explanation
allocation.debugDecision(true);
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
RoutingExplanations explanations = commands.execute(allocation, explain);
// we revert the ignore disable flag, since when rerouting, we want the original setting to take place
allocation.ignoreDisable(false);
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), explanations);
}
/**
@ -157,9 +158,9 @@ public class AllocationService extends AbstractComponent {
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
return new RoutingAllocation.Result(false, clusterState.routingTable());
}
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()));
}
/**
@ -194,9 +195,9 @@ public class AllocationService extends AbstractComponent {
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
return new RoutingAllocation.Result(false, clusterState.routingTable());
}
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()));
}
private boolean reroute(RoutingAllocation allocation) {

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Class encapsulating the explanation for a single {@link AllocationCommand}
* taken from the Deciders
*/
public class RerouteExplanation implements ToXContent {
private AllocationCommand command;
private Decision decisions;
public RerouteExplanation(AllocationCommand command, Decision decisions) {
this.command = command;
this.decisions = decisions;
}
public AllocationCommand command() {
return this.command;
}
public Decision decisions() {
return this.decisions;
}
public static RerouteExplanation readFrom(StreamInput in) throws IOException {
String commandName = in.readString();
AllocationCommand command = AllocationCommands.lookupFactorySafe(commandName).readFrom(in);
Decision decisions = Decision.readFrom(in);
return new RerouteExplanation(command, decisions);
}
public static void writeTo(RerouteExplanation explanation, StreamOutput out) throws IOException {
out.writeString(explanation.command.name());
AllocationCommands.lookupFactorySafe(explanation.command.name()).writeTo(explanation.command, out);
Decision.writeTo(explanation.decisions, out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("command", command.name());
AllocationCommands.lookupFactorySafe(command.name()).toXContent(command, builder, params, "parameters");
// The Decision could be a Multi or Single decision, and they should
// both be encoded the same, so check and wrap in an array if necessary
if (decisions instanceof Decision.Multi) {
decisions.toXContent(builder, params);
} else {
builder.startArray("decisions");
decisions.toXContent(builder, params);
builder.endArray();
}
builder.endObject();
return builder;
}
}

View File

@ -47,19 +47,30 @@ public class RoutingAllocation {
private final RoutingTable routingTable;
private final AllocationExplanation explanation;
private RoutingExplanations explanations = new RoutingExplanations();
/**
* Creates a new {@link RoutingAllocation.Result}
*
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
* @param routingTable the {@link RoutingTable} this Result references
*/
public Result(boolean changed, RoutingTable routingTable) {
this.changed = changed;
this.routingTable = routingTable;
}
/**
* Creates a new {@link RoutingAllocation.Result}
*
* @param changed a flag to determine whether the actual {@link RoutingTable} has been changed
* @param routingTable the {@link RoutingTable} this Result references
* @param explanation Explanation of the Result
* @param explanations Explanation for the reroute actions
*/
public Result(boolean changed, RoutingTable routingTable, AllocationExplanation explanation) {
public Result(boolean changed, RoutingTable routingTable, RoutingExplanations explanations) {
this.changed = changed;
this.routingTable = routingTable;
this.explanation = explanation;
this.explanations = explanations;
}
/** determine whether the actual {@link RoutingTable} has been changed
@ -81,8 +92,8 @@ public class RoutingAllocation {
* Get the explanation of this result
* @return explanation
*/
public AllocationExplanation explanation() {
return explanation;
public RoutingExplanations explanations() {
return explanations;
}
}
@ -198,10 +209,14 @@ public class RoutingAllocation {
/**
* Create a routing decision, including the reason if the debug flag is
* turned on
* @param decision decision whether to allow/deny allocation
* @param deciderLabel a human readable label for the AllocationDecider
* @param reason a format string explanation of the decision
* @param params format string parameters
*/
public Decision decision(Decision decision, String reason, Object... params) {
public Decision decision(Decision decision, String deciderLabel, String reason, Object... params) {
if (debugDecision()) {
return Decision.single(decision.type(), reason, params);
return Decision.single(decision.type(), deciderLabel, reason, params);
} else {
return decision;
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
/**
* Class used to encapsulate a number of {@link RerouteExplanation}
* explanations.
*/
public class RoutingExplanations implements ToXContent {
private final List<RerouteExplanation> explanations;
public RoutingExplanations() {
this.explanations = newArrayList();
}
public RoutingExplanations add(RerouteExplanation explanation) {
this.explanations.add(explanation);
return this;
}
public List<RerouteExplanation> explanations() {
return this.explanations;
}
/**
* Read in a RoutingExplanations object
*/
public static RoutingExplanations readFrom(StreamInput in) throws IOException {
int exCount = in.readVInt();
RoutingExplanations exp = new RoutingExplanations();
for (int i = 0; i < exCount; i++) {
RerouteExplanation explanation = RerouteExplanation.readFrom(in);
exp.add(explanation);
}
return exp;
}
/**
* Write the RoutingExplanations object
*/
public static void writeTo(RoutingExplanations explanations, StreamOutput out) throws IOException {
out.writeVInt(explanations.explanations.size());
for (RerouteExplanation explanation : explanations.explanations) {
RerouteExplanation.writeTo(explanation, out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("explanations");
for (RerouteExplanation explanation : explanations) {
explanation.toXContent(builder, params);
}
builder.endArray();
return builder;
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
@ -102,9 +103,13 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
@Override
public void toXContent(AllocateAllocationCommand command, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("index", command.shardId().index());
public void toXContent(AllocateAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.shardId().index().name());
builder.field("shard", command.shardId().id());
builder.field("node", command.node());
builder.field("allow_primary", command.allowPrimary());
@ -162,7 +167,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticsearchException {
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) throws ElasticsearchException {
DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
MutableShardRouting shardRouting = null;
@ -176,24 +181,43 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
if (shardRouting == null) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"failed to find " + shardId + " on the list of unassigned shards"));
}
throw new ElasticsearchIllegalArgumentException("[allocate] failed to find " + shardId + " on the list of unassigned shards");
}
if (shardRouting.primary() && !allowPrimary) {
throw new ElasticsearchIllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + "], which is disabled");
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"trying to allocate a primary shard " + shardId + ", which is disabled"));
}
throw new ElasticsearchIllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + ", which is disabled");
}
RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
if (routingNode == null) {
if (!discoNode.dataNode()) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"Allocation can only be done on data nodes, not [" + node + "]"));
}
throw new ElasticsearchIllegalArgumentException("Allocation can only be done on data nodes, not [" + node + "]");
} else {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
"Could not find [" + node + "] among the routing nodes"));
}
throw new ElasticsearchIllegalStateException("Could not find [" + node + "] among the routing nodes");
}
}
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
if (explain) {
return new RerouteExplanation(this, decision);
}
throw new ElasticsearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
// go over and remove it from the unassigned
@ -210,5 +234,6 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
break;
}
return new RerouteExplanation(this, decision);
}
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -68,10 +70,11 @@ public interface AllocationCommand {
* Writes an {@link AllocationCommand} using an {@link XContentBuilder}
* @param command {@link AllocationCommand} to write
* @param builder {@link XContentBuilder} to use
* @param params parameters to use when writing the command
* @param params parameters to use when writing the command
* @param objectName object the encoding should be encased in, null means a plain object
* @throws IOException if something happens during writing the command
*/
void toXContent(T command, XContentBuilder builder, ToXContent.Params params) throws IOException;
void toXContent(T command, XContentBuilder builder, ToXContent.Params params, @Nullable String objectName) throws IOException;
}
/**
@ -85,5 +88,5 @@ public interface AllocationCommand {
* @param allocation {@link RoutingAllocation} to modify
* @throws org.elasticsearch.ElasticsearchException if something happens during reconfiguration
*/
void execute(RoutingAllocation allocation) throws ElasticsearchException;
RerouteExplanation execute(RoutingAllocation allocation, boolean explain) throws ElasticsearchException;
}

View File

@ -19,10 +19,10 @@
package org.elasticsearch.cluster.routing.allocation.command;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -37,6 +37,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
/**
* A simple {@link AllocationCommand} composite managing several
* {@link AllocationCommand} implementations
@ -73,7 +75,7 @@ public class AllocationCommands {
registerFactory(MoveAllocationCommand.NAME, new MoveAllocationCommand.Factory());
}
private final List<AllocationCommand> commands = Lists.newArrayList();
private final List<AllocationCommand> commands = newArrayList();
/**
* Creates a new set of {@link AllocationCommands}
@ -111,10 +113,12 @@ public class AllocationCommands {
* @param allocation {@link RoutingAllocation} to apply this command to
* @throws org.elasticsearch.ElasticsearchException if something happens during execution
*/
public void execute(RoutingAllocation allocation) throws ElasticsearchException {
public RoutingExplanations execute(RoutingAllocation allocation, boolean explain) throws ElasticsearchException {
RoutingExplanations explanations = new RoutingExplanations();
for (AllocationCommand command : commands) {
command.execute(allocation);
explanations.add(command.execute(allocation, explain));
}
return explanations;
}
/**
@ -216,7 +220,7 @@ public class AllocationCommands {
for (AllocationCommand command : commands.commands) {
builder.startObject();
builder.field(command.name());
AllocationCommands.lookupFactorySafe(command.name()).toXContent(command, builder, params);
AllocationCommands.lookupFactorySafe(command.name()).toXContent(command, builder, params, null);
builder.endObject();
}
builder.endArray();

View File

@ -27,7 +27,9 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -105,9 +107,13 @@ public class CancelAllocationCommand implements AllocationCommand {
}
@Override
public void toXContent(CancelAllocationCommand command, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("index", command.shardId().index());
public void toXContent(CancelAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.shardId().index().name());
builder.field("shard", command.shardId().id());
builder.field("node", command.node());
builder.field("allow_primary", command.allowPrimary());
@ -159,7 +165,7 @@ public class CancelAllocationCommand implements AllocationCommand {
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticsearchException {
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) throws ElasticsearchException {
DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
boolean found = false;
for (RoutingNodes.RoutingNodeIterator it = allocation.routingNodes().routingNodeIter(discoNode.id()); it.hasNext(); ) {
@ -187,7 +193,12 @@ public class CancelAllocationCommand implements AllocationCommand {
// the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one
if (!allowPrimary && shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticsearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state");
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "cancel_allocation_command",
"can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state"));
}
throw new ElasticsearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
discoNode + ", shard is primary and initializing its state");
}
it.moveToUnassigned();
// now, go and find the shard that is initializing on the target node, and cancel it as well...
@ -205,7 +216,12 @@ public class CancelAllocationCommand implements AllocationCommand {
// the shard is not relocating, its either started, or initializing, just cancel it and move on...
if (!allowPrimary && shardRouting.primary()) {
// can't cancel a primary shard being initialized
throw new ElasticsearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and started");
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "cancel_allocation_command",
"can't cancel " + shardId + " on node " + discoNode + ", shard is primary and started"));
}
throw new ElasticsearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
discoNode + ", shard is primary and started");
}
it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
@ -213,7 +229,13 @@ public class CancelAllocationCommand implements AllocationCommand {
}
}
if (!found) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "cancel_allocation_command",
"can't cancel " + shardId + ", failed to find it on node " + discoNode));
}
throw new ElasticsearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + ", failed to find it on node " + discoNode);
}
return new RerouteExplanation(this, allocation.decision(Decision.YES, "cancel_allocation_command",
"shard " + shardId + " on node " + discoNode + " can be cancelled"));
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.StreamInput;
@ -104,9 +105,13 @@ public class MoveAllocationCommand implements AllocationCommand {
}
@Override
public void toXContent(MoveAllocationCommand command, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("index", command.shardId().index());
public void toXContent(MoveAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
if (objectName == null) {
builder.startObject();
} else {
builder.startObject(objectName);
}
builder.field("index", command.shardId().index().name());
builder.field("shard", command.shardId().id());
builder.field("from_node", command.fromNode());
builder.field("to_node", command.toNode());
@ -142,9 +147,10 @@ public class MoveAllocationCommand implements AllocationCommand {
}
@Override
public void execute(RoutingAllocation allocation) throws ElasticsearchException {
public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) throws ElasticsearchException {
DiscoveryNode fromDiscoNode = allocation.nodes().resolveNode(fromNode);
DiscoveryNode toDiscoNode = allocation.nodes().resolveNode(toNode);
Decision decision = null;
boolean found = false;
for (MutableShardRouting shardRouting : allocation.routingNodes().node(fromDiscoNode.id())) {
@ -155,12 +161,20 @@ public class MoveAllocationCommand implements AllocationCommand {
// TODO we can possibly support also relocating cases, where we cancel relocation and move...
if (!shardRouting.started()) {
throw new ElasticsearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", shard is not started (state = " + shardRouting.state() + "]");
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO, "move_allocation_command",
"shard " + shardId + " has not been started"));
}
throw new ElasticsearchIllegalArgumentException("[move_allocation] can't move " + shardId +
", shard is not started (state = " + shardRouting.state() + "]");
}
RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id());
Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (decision.type() == Decision.Type.NO) {
if (explain) {
return new RerouteExplanation(this, decision);
}
throw new ElasticsearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed, reason: " + decision);
}
if (decision.type() == Decision.Type.THROTTLE) {
@ -175,7 +189,12 @@ public class MoveAllocationCommand implements AllocationCommand {
}
if (!found) {
if (explain) {
return new RerouteExplanation(this, allocation.decision(Decision.NO,
"move_allocation_command", "shard " + shardId + " not found"));
}
throw new ElasticsearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", failed to find it on node " + fromDiscoNode);
}
return new RerouteExplanation(this, decision);
}
}

View File

@ -82,6 +82,8 @@ import java.util.Map;
*/
public class AwarenessAllocationDecider extends AllocationDecider {
public static final String NAME = "awareness";
public static final String CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTES = "cluster.routing.allocation.awareness.attributes";
public static final String CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP = "cluster.routing.allocation.awareness.force.";
@ -168,7 +170,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
return allocation.decision(Decision.YES, "no allocation awareness enabled");
return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled");
}
IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index());
@ -176,7 +178,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return allocation.decision(Decision.NO, "node does not contain awareness attribute: [%s]", awarenessAttribute);
return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute);
}
// build attr_value -> nodes map
@ -234,7 +236,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute));
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return allocation.decision(Decision.NO, "too many shards on nodes for attribute: [%s]", awarenessAttribute);
return allocation.decision(Decision.NO, NAME, "too many shards on nodes for attribute: [%s]", awarenessAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
@ -242,6 +244,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
return allocation.decision(Decision.YES, "node meets awareness requirements");
return allocation.decision(Decision.YES, NAME, "node meets awareness requirements");
}
}

View File

@ -45,6 +45,8 @@ import java.util.Locale;
*/
public class ClusterRebalanceAllocationDecider extends AllocationDecider {
public static final String NAME = "cluster_rebalance";
/**
* An enum representation for the configured re-balance type.
*/
@ -87,27 +89,27 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return allocation.decision(Decision.NO, "cluster has unassigned primary shards");
return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards");
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return allocation.decision(Decision.NO, "cluster has inactive primary shards");
return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards");
}
return allocation.decision(Decision.YES, "all primary shards are active");
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
return allocation.decision(Decision.NO, "cluster has unassigned shards");
return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards");
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return allocation.decision(Decision.NO, "cluster has inactive shards");
return allocation.decision(Decision.NO, NAME, "cluster has inactive shards");
}
}
// type == Type.ALWAYS
return allocation.decision(Decision.YES, "all shards are active");
return allocation.decision(Decision.YES, NAME, "all shards are active");
}
}

View File

@ -39,6 +39,8 @@ import org.elasticsearch.node.settings.NodeSettingsService;
*/
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
public static final String NAME = "concurrent_rebalance";
public static final String CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE = "cluster.routing.allocation.cluster_concurrent_rebalance";
class ApplySettings implements NodeSettingsService.Listener {
@ -65,12 +67,12 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, "all concurrent rebalances are allowed");
return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed");
}
if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
return allocation.decision(Decision.NO, "too man concurrent rebalances [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too man concurrent rebalances [%d], limit: [%d]",
allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
}
return allocation.decision(Decision.YES, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
}
}

View File

@ -20,7 +20,13 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
@ -30,7 +36,7 @@ import java.util.Locale;
*
* @see AllocationDecider
*/
public abstract class Decision {
public abstract class Decision implements ToXContent {
public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
@ -40,12 +46,52 @@ public abstract class Decision {
/**
* Creates a simple decision
* @param type {@link Type} of the decision
* @param label label for the Decider that produced this decision
* @param explanation explanation of the decision
* @param explanationParams additional parameters for the decision
* @return new {@link Decision} instance
*/
public static Decision single(Type type, String explanation, Object... explanationParams) {
return new Single(type, explanation, explanationParams);
public static Decision single(Type type, String label, String explanation, Object... explanationParams) {
return new Single(type, label, explanation, explanationParams);
}
public static void writeTo(Decision decision, StreamOutput out) throws IOException {
if (decision instanceof Multi) {
// Flag specifying whether it is a Multi or Single Decision
out.writeBoolean(true);
out.writeVInt(((Multi) decision).decisions.size());
for (Decision d : ((Multi) decision).decisions) {
writeTo(d, out);
}
} else {
// Flag specifying whether it is a Multi or Single Decision
out.writeBoolean(false);
Single d = ((Single) decision);
Type.writeTo(d.type, out);
out.writeOptionalString(d.label);
// Flatten explanation on serialization, so that explanationParams
// do not need to be serialized
out.writeOptionalString(d.getExplanation());
}
}
public static Decision readFrom(StreamInput in) throws IOException {
// Determine whether to read a Single or Multi Decision
if (in.readBoolean()) {
Multi result = new Multi();
int decisionCount = in.readVInt();
for (int i = 0; i < decisionCount; i++) {
Decision s = readFrom(in);
result.decisions.add(s);
}
return result;
} else {
Single result = new Single();
result.type = Type.readFrom(in);
result.label = in.readOptionalString();
result.explanationString = in.readOptionalString();
return result;
}
}
/**
@ -55,7 +101,40 @@ public abstract class Decision {
public static enum Type {
YES,
NO,
THROTTLE
THROTTLE;
public static Type resolve(String s) {
return Type.valueOf(s.toUpperCase(Locale.ROOT));
}
public static Type readFrom(StreamInput in) throws IOException {
int i = in.readVInt();
switch (i) {
case 0:
return NO;
case 1:
return YES;
case 2:
return THROTTLE;
default:
throw new ElasticsearchIllegalArgumentException("No Type for integer [" + i + "]");
}
}
public static void writeTo(Type type, StreamOutput out) throws IOException {
switch (type) {
case NO:
out.writeVInt(0);
break;
case YES:
out.writeVInt(1);
break;
case THROTTLE:
out.writeVInt(2);
default:
throw new ElasticsearchIllegalArgumentException("Invalid Type [" + type + "]");
}
}
}
/**
@ -64,20 +143,28 @@ public abstract class Decision {
*/
public abstract Type type();
public abstract String label();
/**
* Simple class representing a single decision
*/
public static class Single extends Decision {
private final Type type;
private final String explanation;
private final Object[] explanationParams;
private Type type;
private String label;
private String explanation;
private String explanationString;
private Object[] explanationParams;
public Single() {
}
/**
* Creates a new {@link Single} decision of a given type
* @param type {@link Type} of the decision
*/
public Single(Type type) {
this(type, null, (Object[]) null);
this(type, null, null, (Object[]) null);
}
/**
@ -87,8 +174,9 @@ public abstract class Decision {
* @param explanation An explanation of this {@link Decision}
* @param explanationParams A set of additional parameters
*/
public Single(Type type, String explanation, Object... explanationParams) {
public Single(Type type, String label, String explanation, Object... explanationParams) {
this.type = type;
this.label = label;
this.explanation = explanation;
this.explanationParams = explanationParams;
}
@ -98,12 +186,38 @@ public abstract class Decision {
return this.type;
}
@Override
public String label() {
return this.label;
}
/**
* Returns the explanation string, fully formatted. Only formats the string once
*/
public String getExplanation() {
if (explanationString == null && explanation != null) {
explanationString = String.format(Locale.ROOT, explanation, explanationParams);
}
return this.explanationString;
}
@Override
public String toString() {
if (explanation == null) {
return type + "()";
}
return type + "(" + String.format(Locale.ROOT, explanation, explanationParams) + ")";
return type + "(" + getExplanation() + ")";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("decider", label);
builder.field("decision", type);
String explanation = getExplanation();
builder.field("explanation", explanation != null ? explanation : "none");
builder.endObject();
return builder;
}
}
@ -115,7 +229,7 @@ public abstract class Decision {
private final List<Decision> decisions = Lists.newArrayList();
/**
* Add a decission to this {@link Multi}decision instance
* Add a decision to this {@link Multi}decision instance
* @param decision {@link Decision} to add
* @return {@link Multi}decision instance with the given decision added
*/
@ -138,6 +252,12 @@ public abstract class Decision {
return ret;
}
@Override
public String label() {
// Multi decisions have no labels
return null;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -146,5 +266,15 @@ public abstract class Decision {
}
return sb.toString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("decisions");
for (Decision d : decisions) {
d.toXContent(builder, params);
}
builder.endArray();
return builder;
}
}
}

View File

@ -56,6 +56,8 @@ import org.elasticsearch.node.settings.NodeSettingsService;
@Deprecated
public class DisableAllocationDecider extends AllocationDecider {
public static final String NAME = "disable";
public static final String CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION = "cluster.routing.allocation.disable_new_allocation";
public static final String CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION = "cluster.routing.allocation.disable_allocation";
public static final String CLUSTER_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION = "cluster.routing.allocation.disable_replica_allocation";
@ -104,28 +106,28 @@ public class DisableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, "allocation disabling is ignored");
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
return allocation.decision(Decision.NO, "new primary allocation is disabled");
return allocation.decision(Decision.NO, NAME, "new primary allocation is disabled");
} else {
return allocation.decision(Decision.YES, "new primary allocation is enabled");
return allocation.decision(Decision.YES, NAME, "new primary allocation is enabled");
}
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, disableAllocation)) {
return allocation.decision(Decision.NO, "all allocation is disabled");
return allocation.decision(Decision.NO, NAME, "all allocation is disabled");
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION, disableReplicaAllocation)) {
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocation is enabled");
return allocation.decision(Decision.YES, NAME, "primary allocation is enabled");
} else {
return allocation.decision(Decision.NO, "replica allocation is disabled");
return allocation.decision(Decision.NO, NAME, "replica allocation is disabled");
}
}
return allocation.decision(Decision.YES, "all allocation is enabled");
return allocation.decision(Decision.YES, NAME, "all allocation is enabled");
}
}

View File

@ -59,6 +59,8 @@ import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifi
*/
public class DiskThresholdDecider extends AllocationDecider {
public static final String NAME = "disk_threshold";
private volatile Double freeDiskThresholdLow;
private volatile Double freeDiskThresholdHigh;
private volatile ByteSizeValue freeBytesThresholdLow;
@ -129,11 +131,11 @@ public class DiskThresholdDecider extends AllocationDecider {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return allocation.decision(Decision.YES, "disk threshold decider disabled");
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return allocation.decision(Decision.YES, "only a single node is present");
return allocation.decision(Decision.YES, NAME, "only a single node is present");
}
ClusterInfo clusterInfo = allocation.clusterInfo();
@ -141,7 +143,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, "cluster info unavailable");
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
@ -150,7 +152,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, "disk usages unavailable");
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
@ -175,7 +177,7 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, "less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdLow) {
@ -183,7 +185,7 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, "less than required [%d%%] free disk on node, free: [%d%%]",
return allocation.decision(Decision.NO, NAME, "less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
}
@ -195,26 +197,26 @@ public class DiskThresholdDecider extends AllocationDecider {
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
logger.warn("After allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
return allocation.decision(Decision.NO, "after allocation less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
logger.warn("After allocating, node [{}] would have less than the required {}% free disk threshold ({}% free), preventing allocation",
node.nodeId(), freeDiskThresholdHigh, freeSpaceAfterShard);
return allocation.decision(Decision.NO, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdLow, freeSpaceAfterShard);
}
return allocation.decision(Decision.YES, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (!enabled) {
return allocation.decision(Decision.YES, "disk threshold decider disabled");
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
}
// Allow allocation regardless if only a single node is available
if (allocation.nodes().size() <= 1) {
return allocation.decision(Decision.YES, "only a single node is present");
return allocation.decision(Decision.YES, NAME, "only a single node is present");
}
ClusterInfo clusterInfo = allocation.clusterInfo();
@ -222,7 +224,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, "cluster info unavailable");
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
@ -230,7 +232,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("Unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, "disk usages unavailable");
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
}
DiskUsage usage = usages.get(node.nodeId());
@ -255,7 +257,7 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, "after allocation less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
@ -263,11 +265,11 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%d%%] free disk on node, free: [%d%%]",
freeDiskThresholdHigh, freeDiskPercentage);
}
return allocation.decision(Decision.YES, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
}
/**

View File

@ -45,6 +45,8 @@ import java.util.Locale;
*/
public class EnableAllocationDecider extends AllocationDecider implements NodeSettingsService.Listener {
public static final String NAME = "enable";
public static final String CLUSTER_ROUTING_ALLOCATION_ENABLE = "cluster.routing.allocation.enable";
public static final String INDEX_ROUTING_ALLOCATION_ENABLE = "index.routing.allocation.enable";
@ -60,7 +62,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, "allocation disabling is ignored");
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
@ -73,20 +75,20 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
}
switch (enable) {
case ALL:
return allocation.decision(Decision.YES, "all allocations are allowed");
return allocation.decision(Decision.YES, NAME, "all allocations are allowed");
case NONE:
return allocation.decision(Decision.NO, "no allocations are allowed");
return allocation.decision(Decision.NO, NAME, "no allocations are allowed");
case NEW_PRIMARIES:
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
return allocation.decision(Decision.YES, "new primary allocations are allowed");
return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
} else {
return allocation.decision(Decision.NO, "non-new primary allocations are disallowed");
return allocation.decision(Decision.NO, NAME, "non-new primary allocations are disallowed");
}
case PRIMARIES:
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocations are allowed");
return allocation.decision(Decision.YES, NAME, "primary allocations are allowed");
} else {
return allocation.decision(Decision.NO, "replica allocations are disallowed");
return allocation.decision(Decision.NO, NAME, "replica allocations are disallowed");
}
default:
throw new ElasticsearchIllegalStateException("Unknown allocation option");

View File

@ -60,6 +60,8 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
*/
public class FilterAllocationDecider extends AllocationDecider {
public static final String NAME = "filter";
public static final String INDEX_ROUTING_REQUIRE_GROUP = "index.routing.allocation.require.";
public static final String INDEX_ROUTING_INCLUDE_GROUP = "index.routing.allocation.include.";
public static final String INDEX_ROUTING_EXCLUDE_GROUP = "index.routing.allocation.exclude.";
@ -109,38 +111,38 @@ public class FilterAllocationDecider extends AllocationDecider {
private Decision shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (clusterRequireFilters != null) {
if (!clusterRequireFilters.match(node.node())) {
return allocation.decision(Decision.NO, "node does not match global required filters [%s]", clusterRequireFilters);
return allocation.decision(Decision.NO, NAME, "node does not match global required filters [%s]", clusterRequireFilters);
}
}
if (clusterIncludeFilters != null) {
if (!clusterIncludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, "node does not match global include filters [%s]", clusterIncludeFilters);
return allocation.decision(Decision.NO, NAME, "node does not match global include filters [%s]", clusterIncludeFilters);
}
}
if (clusterExcludeFilters != null) {
if (clusterExcludeFilters.match(node.node())) {
return allocation.decision(Decision.NO, "node matches global exclude filters [%s]", clusterExcludeFilters);
return allocation.decision(Decision.NO, NAME, "node matches global exclude filters [%s]", clusterExcludeFilters);
}
}
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
if (indexMd.requireFilters() != null) {
if (!indexMd.requireFilters().match(node.node())) {
return allocation.decision(Decision.NO, "node does not match index required filters [%s]", indexMd.requireFilters());
return allocation.decision(Decision.NO, NAME, "node does not match index required filters [%s]", indexMd.requireFilters());
}
}
if (indexMd.includeFilters() != null) {
if (!indexMd.includeFilters().match(node.node())) {
return allocation.decision(Decision.NO, "node does not match index include filters [%s]", indexMd.includeFilters());
return allocation.decision(Decision.NO, NAME, "node does not match index include filters [%s]", indexMd.includeFilters());
}
}
if (indexMd.excludeFilters() != null) {
if (indexMd.excludeFilters().match(node.node())) {
return allocation.decision(Decision.NO, "node matches index exclude filters [%s]", indexMd.excludeFilters());
return allocation.decision(Decision.NO, NAME, "node matches index exclude filters [%s]", indexMd.excludeFilters());
}
}
return allocation.decision(Decision.YES, "node passes include/exclude/require filters");
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}
class ApplySettings implements NodeSettingsService.Listener {

View File

@ -36,6 +36,8 @@ import org.elasticsearch.common.settings.Settings;
*/
public class NodeVersionAllocationDecider extends AllocationDecider {
public static final String NAME = "node_version";
@Inject
public NodeVersionAllocationDecider(Settings settings) {
super(settings);
@ -49,11 +51,11 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
if (sourceNodeId == null) { // we allocate - check primary
if (shardRouting.primary()) {
// we are the primary we can allocate wherever
return allocation.decision(Decision.YES, "primary shard can be allocated anywhere");
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
}
final MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) { // we have a primary - it's a start ;)
return allocation.decision(Decision.YES, "no active primary shard yet");
return allocation.decision(Decision.YES, NAME, "no active primary shard yet");
}
sourceNodeId = primary.currentNodeId();
}
@ -67,10 +69,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
return allocation.decision(Decision.YES, "target node version [%s] is same or newer than source node version [%s]",
return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]",
target.node().version(), source.node().version());
} else {
return allocation.decision(Decision.NO, "target node version [%s] is older than source node version [%s]",
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]",
target.node().version(), source.node().version());
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.settings.Settings;
*/
public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider {
public static final String NAME = "rebalance_only_when_active";
@Inject
public RebalanceOnlyWhenActiveAllocationDecider(Settings settings) {
super(settings);
@ -39,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
if (!allocation.routingNodes().allReplicasActive(shardRouting)) {
return allocation.decision(Decision.NO, "not all replicas are active in cluster");
return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster");
}
return allocation.decision(Decision.YES, "all replicas are active in cluster");
return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster");
}
}

View File

@ -31,6 +31,8 @@ import org.elasticsearch.common.settings.Settings;
*/
public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecider {
private static final String NAME = "replica_after_primary_active";
@Inject
public ReplicaAfterPrimaryActiveAllocationDecider(Settings settings) {
super(settings);
@ -43,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "shard is primary");
return allocation.decision(Decision.YES, NAME, "shard is primary");
}
MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) {
return allocation.decision(Decision.NO, "primary shard is not yet active");
return allocation.decision(Decision.NO, NAME, "primary shard is not yet active");
}
return allocation.decision(Decision.YES, "primary is already active");
return allocation.decision(Decision.YES, NAME, "primary is already active");
}
}

View File

@ -44,6 +44,8 @@ import org.elasticsearch.common.settings.Settings;
*/
public class SameShardAllocationDecider extends AllocationDecider {
public static final String NAME = "same_shard";
public static final String SAME_HOST_SETTING = "cluster.routing.allocation.same_shard.host";
private final boolean sameHost;
@ -60,7 +62,7 @@ public class SameShardAllocationDecider extends AllocationDecider {
Iterable<MutableShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting);
for (MutableShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
}
}
if (sameHost) {
@ -83,14 +85,14 @@ public class SameShardAllocationDecider extends AllocationDecider {
if (checkNodeOnSameHost) {
for (MutableShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, "shard cannot be allocated on same host [%s] it already exists on",
node.nodeId());
return allocation.decision(Decision.NO, NAME,
"shard cannot be allocated on same host [%s] it already exists on", node.nodeId());
}
}
}
}
}
}
return allocation.decision(Decision.YES, "shard is not allocated to same node or host");
return allocation.decision(Decision.YES, NAME, "shard is not allocated to same node or host");
}
}

View File

@ -49,6 +49,8 @@ import org.elasticsearch.common.settings.Settings;
*/
public class ShardsLimitAllocationDecider extends AllocationDecider {
public static final String NAME = "shards_limit";
/**
* Controls the maximum number of shards per index on a single Elasticsearch
* node. Negative values are interpreted as unlimited.
@ -65,7 +67,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.settings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) {
return allocation.decision(Decision.YES, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
}
int nodeCount = 0;
@ -80,10 +82,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeCount++;
}
if (nodeCount >= totalShardsPerNode) {
return allocation.decision(Decision.NO, "too many shards for this index on node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]",
nodeCount, totalShardsPerNode);
}
return allocation.decision(Decision.YES, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
}
@Override
@ -91,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.settings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) {
return allocation.decision(Decision.YES, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode);
}
int nodeCount = 0;
@ -106,9 +108,9 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeCount++;
}
if (nodeCount > totalShardsPerNode) {
return allocation.decision(Decision.NO, "too many shards for this index on node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]",
nodeCount, totalShardsPerNode);
}
return allocation.decision(Decision.YES, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode);
}
}

View File

@ -34,6 +34,8 @@ import org.elasticsearch.node.settings.NodeSettingsService;
*/
public class SnapshotInProgressAllocationDecider extends AllocationDecider {
public static final String NAME = "snapshot_in_progress";
/**
* Disables relocation of shards that are currently being snapshotted.
*/
@ -99,19 +101,19 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
SnapshotMetaData snapshotMetaData = allocation.metaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null) {
// Snapshots are not running
return allocation.decision(Decision.YES, "no snapshots are currently running");
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
}
for (SnapshotMetaData.Entry snapshot : snapshotMetaData.entries()) {
SnapshotMetaData.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
return allocation.decision(Decision.NO, "snapshot for shard [%s] is currently running on node [%s]",
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
return allocation.decision(Decision.YES, "shard not primary or relocation disabled");
return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled");
}
}

View File

@ -50,6 +50,8 @@ import org.elasticsearch.node.settings.NodeSettingsService;
*/
public class ThrottlingAllocationDecider extends AllocationDecider {
public static final String NAME = "throttling";
public static final String CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = "cluster.routing.allocation.node_initial_primaries_recoveries";
public static final String CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = "cluster.routing.allocation.node_concurrent_recoveries";
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2;
@ -85,10 +87,10 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
return allocation.decision(Decision.THROTTLE, "too many primaries currently recovering [%d], limit: [%d]",
return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return allocation.decision(Decision.YES, "below primary recovery limit of [%d]", primariesInitialRecoveries);
return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
}
}
}
@ -107,10 +109,10 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (currentRecoveries >= concurrentRecoveries) {
return allocation.decision(Decision.THROTTLE, "too many shards currently recovering [%d], limit: [%d]",
return allocation.decision(Decision.THROTTLE, NAME, "too many shards currently recovering [%d], limit: [%d]",
currentRecoveries, concurrentRecoveries);
} else {
return allocation.decision(Decision.YES, "below shard recovery limit of [%d]", concurrentRecoveries);
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of [%d]", concurrentRecoveries);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
@ -50,6 +51,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
clusterRerouteRequest.explain(request.paramAsBoolean("explain", clusterRerouteRequest.explain()));
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));
clusterRerouteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterRerouteRequest.masterNodeTimeout()));
if (request.hasContent()) {
@ -75,6 +77,10 @@ public class RestClusterRerouteAction extends BaseRestHandler {
}
response.getState().settingsFilter(settingsFilter).toXContent(builder, request);
builder.endObject();
if (clusterRerouteRequest.explain()) {
assert response.getExplanations() != null;
response.getExplanations().toXContent(builder, ToXContent.EMPTY_PARAMS);
}
}
@Override

View File

@ -21,10 +21,14 @@ package org.elasticsearch.cluster.allocation;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Priority;
@ -84,6 +88,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
logger.info("--> explicitly allocate shard 1, *under dry_run*");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.setDryRun(true)
.execute().actionGet().getState();
@ -96,6 +101,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
@ -111,6 +117,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
logger.info("--> move shard 1 primary from node1 to node2");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2))
.execute().actionGet().getState();
@ -164,6 +171,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
@ -197,6 +205,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
assertThat(client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().getStatus(), equalTo(ClusterHealthStatus.RED));
logger.info("--> explicitly allocate primary");
state = client().admin().cluster().prepareReroute()
.setExplain(randomBoolean())
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
.execute().actionGet().getState();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
@ -212,4 +221,47 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
}
@Test
public void rerouteExplain() {
Settings commonSettings = settingsBuilder().build();
logger.info("--> starting a node");
String node_1 = cluster().startNode(commonSettings);
assertThat(cluster().size(), equalTo(1));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> create an index with 1 shard");
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.execute().actionGet();
ensureGreen("test");
logger.info("--> disable allocation");
Settings newSettings = settingsBuilder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE.name())
.build();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();
logger.info("--> starting a second node");
String node_2 = cluster().startNode(commonSettings);
assertThat(cluster().size(), equalTo(2));
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> try to move the shard from node1 to node2");
MoveAllocationCommand cmd = new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2);
ClusterRerouteResponse resp = client().admin().cluster().prepareReroute().add(cmd).setExplain(true).execute().actionGet();
RoutingExplanations e = resp.getExplanations();
assertThat(e.explanations().size(), equalTo(1));
RerouteExplanation explanation = e.explanations().get(0);
assertThat(explanation.command().name(), equalTo(cmd.name()));
assertThat(((MoveAllocationCommand)explanation.command()).shardId(), equalTo(cmd.shardId()));
assertThat(((MoveAllocationCommand)explanation.command()).fromNode(), equalTo(cmd.fromNode()));
assertThat(((MoveAllocationCommand)explanation.command()).toNode(), equalTo(cmd.toNode()));
assertThat(explanation.decisions().type(), equalTo(Decision.Type.YES));
}
}