diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 3bce1aece1d..673f1d5dc61 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -326,17 +326,6 @@ - - - - - - - - - - - diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index 2b33a669428..a659e60f501 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -19,6 +19,8 @@ package org.elasticsearch.action; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; +import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -263,6 +265,7 @@ public class ActionModule extends AbstractModule { registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class); registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); + registerAction(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java new file mode 100644 index 00000000000..d34ac63602d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Action for explaining shard allocation for a shard in the cluster + */ +public class ClusterAllocationExplainAction extends Action { + + public static final ClusterAllocationExplainAction INSTANCE = new ClusterAllocationExplainAction(); + public static final String NAME = "cluster:monitor/allocation/explain"; + + private ClusterAllocationExplainAction() { + super(NAME); + } + + @Override + public ClusterAllocationExplainResponse newResponse() { + return new ClusterAllocationExplainResponse(); + } + + @Override + public ClusterAllocationExplainRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new ClusterAllocationExplainRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java new file mode 100644 index 00000000000..d14785127d0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * A request to explain the allocation of a shard in the cluster + */ +public class ClusterAllocationExplainRequest extends MasterNodeRequest { + + private String index; + private Integer shard; + private Boolean primary; + private boolean includeYesDecisions = false; + + /** Explain the first unassigned shard */ + public ClusterAllocationExplainRequest() { + this.index = null; + this.shard = null; + this.primary = null; + } + + /** + * Create a new allocation explain request. If {@code primary} is false, the first unassigned replica + * will be picked for explanation. If no replicas are unassigned, the first assigned replica will + * be explained. + */ + public ClusterAllocationExplainRequest(String index, int shard, boolean primary) { + this.index = index; + this.shard = shard; + this.primary = primary; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (this.useAnyUnassignedShard() == false) { + if (this.index == null) { + validationException = addValidationError("index must be specified", validationException); + } + if (this.shard == null) { + validationException = addValidationError("shard must be specified", validationException); + } + if (this.primary == null) { + validationException = addValidationError("primary must be specified", validationException); + } + } + return validationException; + } + + /** + * Returns {@code true} iff the first unassigned shard is to be used + */ + public boolean useAnyUnassignedShard() { + return this.index == null && this.shard == null && this.primary == null; + } + + public ClusterAllocationExplainRequest setIndex(String index) { + this.index = index; + return this; + } + + @Nullable + public String getIndex() { + return this.index; + } + + public ClusterAllocationExplainRequest setShard(Integer shard) { + this.shard = shard; + return this; + } + + @Nullable + public int getShard() { + return this.shard; + } + + public ClusterAllocationExplainRequest setPrimary(Boolean primary) { + this.primary = primary; + return this; + } + + @Nullable + public boolean isPrimary() { + return this.primary; + } + + public void includeYesDecisions(boolean includeYesDecisions) { + this.includeYesDecisions = includeYesDecisions; + } + + /** Returns true if all decisions should be included. Otherwise only "NO" and "THROTTLE" decisions are returned */ + public boolean includeYesDecisions() { + return this.includeYesDecisions; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ClusterAllocationExplainRequest["); + if (this.useAnyUnassignedShard()) { + sb.append("useAnyUnassignedShard=true"); + } else { + sb.append("index=").append(index); + sb.append(",shard=").append(shard); + sb.append(",primary?=").append(primary); + } + sb.append(",includeYesDecisions?=").append(includeYesDecisions); + return sb.toString(); + } + + public static ClusterAllocationExplainRequest parse(XContentParser parser) throws IOException { + String currentFieldName = null; + String index = null; + Integer shard = null; + Boolean primary = null; + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName)) { + index = parser.text(); + } else if ("shard".equals(currentFieldName)) { + shard = parser.intValue(); + } else if ("primary".equals(currentFieldName)) { + primary = parser.booleanValue(); + } else { + throw new ElasticsearchParseException("unexpected field [" + currentFieldName + "] in allocation explain request"); + } + + } else if (token == XContentParser.Token.START_OBJECT) { + // the object was started + continue; + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "] in allocation explain request"); + } + } + + if (index == null && shard == null && primary == null) { + // If it was an empty body, use the "any unassigned shard" request + return new ClusterAllocationExplainRequest(); + } else if (index == null || shard == null || primary == null) { + throw new ElasticsearchParseException("'index', 'shard', and 'primary' must be specified in allocation explain request"); + } + return new ClusterAllocationExplainRequest(index, shard, primary); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.index = in.readOptionalString(); + this.shard = in.readOptionalVInt(); + this.primary = in.readOptionalBoolean(); + this.includeYesDecisions = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(index); + out.writeOptionalVInt(shard); + out.writeOptionalBoolean(primary); + out.writeBoolean(includeYesDecisions); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java new file mode 100644 index 00000000000..1a1950c7f11 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Builder for requests to explain the allocation of a shard in the cluster + */ +public class ClusterAllocationExplainRequestBuilder + extends MasterNodeOperationRequestBuilder { + + public ClusterAllocationExplainRequestBuilder(ElasticsearchClient client, ClusterAllocationExplainAction action) { + super(client, action, new ClusterAllocationExplainRequest()); + } + + /** The index name to use when finding the shard to explain */ + public ClusterAllocationExplainRequestBuilder setIndex(String index) { + request.setIndex(index); + return this; + } + + /** The shard number to use when finding the shard to explain */ + public ClusterAllocationExplainRequestBuilder setShard(int shard) { + request.setShard(shard); + return this; + } + + /** Whether the primary or replica should be explained */ + public ClusterAllocationExplainRequestBuilder setPrimary(boolean primary) { + request.setPrimary(primary); + return this; + } + + /** + * Signal that the first unassigned shard should be used + */ + public ClusterAllocationExplainRequestBuilder useAnyUnassignedShard() { + request.setIndex(null); + request.setShard(null); + request.setPrimary(null); + return this; + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java new file mode 100644 index 00000000000..cc586bd1a58 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Explanation response for a shard in the cluster + */ +public class ClusterAllocationExplainResponse extends ActionResponse { + + private ClusterAllocationExplanation cae; + + public ClusterAllocationExplainResponse() { + } + + public ClusterAllocationExplainResponse(ClusterAllocationExplanation cae) { + this.cae = cae; + } + + /** + * Return the explanation for shard allocation in the cluster + */ + public ClusterAllocationExplanation getExplanation() { + return this.cae; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.cae = new ClusterAllocationExplanation(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + cae.writeTo(out); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java new file mode 100644 index 00000000000..6b4173734b8 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java @@ -0,0 +1,200 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContent.Params; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A {@code ClusterAllocationExplanation} is an explanation of why a shard may or may not be allocated to nodes. It also includes weights + * for where the shard is likely to be assigned. It is an immutable class + */ +public final class ClusterAllocationExplanation implements ToXContent, Writeable { + + private final ShardId shard; + private final boolean primary; + private final String assignedNodeId; + private final Map nodeToDecision; + private final Map nodeWeights; + private final UnassignedInfo unassignedInfo; + + public ClusterAllocationExplanation(StreamInput in) throws IOException { + this.shard = ShardId.readShardId(in); + this.primary = in.readBoolean(); + this.assignedNodeId = in.readOptionalString(); + this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new); + + Map ntd = null; + int size = in.readVInt(); + ntd = new HashMap<>(size); + for (int i = 0; i < size; i++) { + DiscoveryNode dn = DiscoveryNode.readNode(in); + Decision decision = Decision.readFrom(in); + ntd.put(dn, decision); + } + this.nodeToDecision = ntd; + + Map ntw = null; + size = in.readVInt(); + ntw = new HashMap<>(size); + for (int i = 0; i < size; i++) { + DiscoveryNode dn = DiscoveryNode.readNode(in); + float weight = in.readFloat(); + ntw.put(dn, weight); + } + this.nodeWeights = ntw; + } + + public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, + UnassignedInfo unassignedInfo, Map nodeToDecision, + Map nodeWeights) { + this.shard = shard; + this.primary = primary; + this.assignedNodeId = assignedNodeId; + this.unassignedInfo = unassignedInfo; + this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision; + this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights; + } + + public ShardId getShard() { + return this.shard; + } + + public boolean isPrimary() { + return this.primary; + } + + /** Return turn if the shard is assigned to a node */ + public boolean isAssigned() { + return this.assignedNodeId != null; + } + + /** Return the assigned node id or null if not assigned */ + @Nullable + public String getAssignedNodeId() { + return this.assignedNodeId; + } + + /** Return the unassigned info for the shard or null if the shard is assigned */ + @Nullable + public UnassignedInfo getUnassignedInfo() { + return this.unassignedInfo; + } + + /** Return a map of node to decision for shard allocation */ + public Map getNodeDecisions() { + return this.nodeToDecision; + } + + /** + * Return a map of node to balancer "weight" for allocation. Higher weights mean the balancer wants to allocated the shard to that node + * more + */ + public Map getNodeWeights() { + return this.nodeWeights; + } + + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); { + builder.startObject("shard"); { + builder.field("index", shard.getIndexName()); + builder.field("index_uuid", shard.getIndex().getUUID()); + builder.field("id", shard.getId()); + builder.field("primary", primary); + } + builder.endObject(); // end shard + builder.field("assigned", this.assignedNodeId != null); + // If assigned, show the node id of the node it's assigned to + if (assignedNodeId != null) { + builder.field("assigned_node_id", this.assignedNodeId); + } + // If we have unassigned info, show that + if (unassignedInfo != null) { + unassignedInfo.toXContent(builder, params); + } + builder.startObject("nodes"); + for (Map.Entry entry : nodeWeights.entrySet()) { + DiscoveryNode node = entry.getKey(); + builder.startObject(node.getId()); { + builder.field("node_name", node.getName()); + builder.startObject("node_attributes"); { + for (ObjectObjectCursor attrKV : node.attributes()) { + builder.field(attrKV.key, attrKV.value); + } + } + builder.endObject(); // end attributes + Decision d = nodeToDecision.get(node); + if (node.getId().equals(assignedNodeId)) { + builder.field("final_decision", "CURRENTLY_ASSIGNED"); + } else { + builder.field("final_decision", d.type().toString()); + } + builder.field("weight", entry.getValue()); + d.toXContent(builder, params); + } + builder.endObject(); // end node + } + builder.endObject(); // end nodes + } + builder.endObject(); // end wrapping object + return builder; + } + + @Override + public ClusterAllocationExplanation readFrom(StreamInput in) throws IOException { + return new ClusterAllocationExplanation(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.getShard().writeTo(out); + out.writeBoolean(this.isPrimary()); + out.writeOptionalString(this.getAssignedNodeId()); + out.writeOptionalWriteable(this.getUnassignedInfo()); + + Map ntd = this.getNodeDecisions(); + out.writeVInt(ntd.size()); + for (Map.Entry entry : ntd.entrySet()) { + entry.getKey().writeTo(out); + Decision.writeTo(entry.getValue(), out); + } + Map ntw = this.getNodeWeights(); + out.writeVInt(ntw.size()); + for (Map.Entry entry : ntw.entrySet()) { + entry.getKey().writeTo(out); + out.writeFloat(entry.getValue()); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java new file mode 100644 index 00000000000..b9b31634bba --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaData.Custom; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingNodes.RoutingNodesIterator; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingExplanations; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the + * master node in the cluster. + */ +public class TransportClusterAllocationExplainAction + extends TransportMasterNodeAction { + + private final AllocationService allocationService; + private final ClusterInfoService clusterInfoService; + private final AllocationDeciders allocationDeciders; + private final ShardsAllocator shardAllocator; + + @Inject + public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService, ClusterInfoService clusterInfoService, + AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator) { + super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, ClusterAllocationExplainRequest::new); + this.allocationService = allocationService; + this.clusterInfoService = clusterInfoService; + this.allocationDeciders = allocationDeciders; + this.shardAllocator = shardAllocator; + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected ClusterBlockException checkBlock(ClusterAllocationExplainRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterAllocationExplainResponse newResponse() { + return new ClusterAllocationExplainResponse(); + } + + /** + * Return the decisions for the given {@code ShardRouting} on the given {@code RoutingNode}. If {@code includeYesDecisions} is not true, + * only non-YES (NO and THROTTLE) decisions are returned. + */ + public static Decision tryShardOnNode(ShardRouting shard, RoutingNode node, RoutingAllocation allocation, boolean includeYesDecisions) { + Decision d = allocation.deciders().canAllocate(shard, node, allocation); + if (includeYesDecisions) { + return d; + } else { + Decision.Multi nonYesDecisions = new Decision.Multi(); + List decisions = d.getDecisions(); + for (Decision decision : decisions) { + if (decision.type() != Decision.Type.YES) { + nonYesDecisions.add(decision); + } + } + return nonYesDecisions; + } + } + + /** + * For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code + * includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions. + */ + public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes, + boolean includeYesDecisions, ShardsAllocator shardAllocator) { + // don't short circuit deciders, we want a full explanation + allocation.debugDecision(true); + // get the existing unassigned info if available + UnassignedInfo ui = shard.unassignedInfo(); + + RoutingNodesIterator iter = routingNodes.nodes(); + Map nodeToDecision = new HashMap<>(); + while (iter.hasNext()) { + RoutingNode node = iter.next(); + DiscoveryNode discoNode = node.node(); + if (discoNode.isDataNode()) { + Decision d = tryShardOnNode(shard, node, allocation, includeYesDecisions); + nodeToDecision.put(discoNode, d); + } + } + return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision, + shardAllocator.weighShard(allocation, shard)); + } + + @Override + protected void masterOperation(final ClusterAllocationExplainRequest request, final ClusterState state, + final ActionListener listener) { + final RoutingNodes routingNodes = state.getRoutingNodes(); + final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(), + clusterInfoService.getClusterInfo(), System.nanoTime()); + + ShardRouting shardRouting = null; + if (request.useAnyUnassignedShard()) { + // If we can use any shard, just pick the first unassigned one (if there are any) + RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator(); + if (ui.hasNext()) { + shardRouting = ui.next(); + } + } else { + String index = request.getIndex(); + int shard = request.getShard(); + if (request.isPrimary()) { + // If we're looking for the primary shard, there's only one copy, so pick it directly + shardRouting = allocation.routingTable().shardRoutingTable(index, shard).primaryShard(); + } else { + // If looking for a replica, go through all the replica shards + List replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards(); + if (replicaShardRoutings.size() > 0) { + // Pick the first replica at the very least + shardRouting = replicaShardRoutings.get(0); + // In case there are multiple replicas where some are assigned and some aren't, + // try to find one that is unassigned at least + for (ShardRouting replica : replicaShardRoutings) { + if (replica.unassigned()) { + shardRouting = replica; + break; + } + } + } + } + } + + if (shardRouting == null) { + listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request)); + return; + } + logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting); + + ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes, + request.includeYesDecisions(), shardAllocator); + listener.onResponse(new ClusterAllocationExplainResponse(cae)); + } +} diff --git a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index d7c76906f91..ecfe307e6c1 100644 --- a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -21,6 +21,9 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -572,4 +575,19 @@ public interface ClusterAdminClient extends ElasticsearchClient { * Simulates an ingest pipeline */ SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source); + + /** + * Explain the allocation of a shard + */ + void allocationExplain(ClusterAllocationExplainRequest request, ActionListener listener); + + /** + * Explain the allocation of a shard + */ + ActionFuture allocationExplain(ClusterAllocationExplainRequest request); + + /** + * Explain the allocation of a shard + */ + ClusterAllocationExplainRequestBuilder prepareAllocationExplain(); } diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 0044890ee35..cb1252dc465 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -25,6 +25,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -1245,6 +1249,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client public SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source) { return new SimulatePipelineRequestBuilder(this, SimulatePipelineAction.INSTANCE, source); } + + @Override + public void allocationExplain(ClusterAllocationExplainRequest request, ActionListener listener) { + execute(ClusterAllocationExplainAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture allocationExplain(ClusterAllocationExplainRequest request) { + return execute(ClusterAllocationExplainAction.INSTANCE, request); + } + + @Override + public ClusterAllocationExplainRequestBuilder prepareAllocationExplain() { + return new ClusterAllocationExplainRequestBuilder(this, ClusterAllocationExplainAction.INSTANCE); + } } static class IndicesAdmin implements IndicesAdminClient { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index be7d90a1fef..b92fecf0f7b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -139,7 +139,7 @@ public class UnassignedInfo implements ToXContent, Writeable { assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; } - UnassignedInfo(StreamInput in) throws IOException { + public UnassignedInfo(StreamInput in) throws IOException { this.reason = Reason.values()[(int) in.readByte()]; this.unassignedTimeMillis = in.readLong(); // As System.nanoTime() cannot be compared across different JVMs, reset it to now. diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8102f206799..97a07169d25 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; @@ -100,6 +101,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards this.threshold = threshold; } + @Override + public Map weighShard(RoutingAllocation allocation, ShardRouting shard) { + final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + return balancer.weighShard(shard); + } + @Override public boolean allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -298,6 +305,29 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return balanceByWeights(); } + public Map weighShard(ShardRouting shard) { + final NodeSorter sorter = newNodeSorter(); + final ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; + + buildWeightOrderedIndices(sorter); + Map nodes = new HashMap<>(modelNodes.length); + float currentNodeWeight = 0.0f; + for (int i = 0; i < modelNodes.length; i++) { + if (modelNodes[i].getNodeId().equals(shard.currentNodeId())) { + // If a node was found with the shard, use that weight instead of 0.0 + currentNodeWeight = weights[i]; + break; + } + } + + for (int i = 0; i < modelNodes.length; i++) { + final float delta = currentNodeWeight - weights[i]; + nodes.put(modelNodes[i].getRoutingNode().node(), delta); + } + return nodes; + } + /** * Balances the nodes on the cluster model according to the weight * function. The configured threshold is the minimum delta between the diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index 0bf07e8cba9..aa59e7788f3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -19,8 +19,11 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import java.util.Map; /** *

* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster. @@ -40,4 +43,15 @@ public interface ShardsAllocator { * @return true if the allocation has changed, otherwise false */ boolean allocate(RoutingAllocation allocation); + + /** + * Returns a map of node to a float "weight" of where the allocator would like to place the shard. + * Higher weights signify greater desire to place the shard on that node. + * Does not modify the allocation at all. + * + * @param allocation current node allocation + * @param shard shard to weigh + * @return map of nodes to float weights + */ + Map weighShard(RoutingAllocation allocation, ShardRouting shard); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 227ec277469..baa0a3b1c0e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -110,7 +110,8 @@ public class AwarenessAllocationDecider extends AllocationDecider { this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings)); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, + this::setForcedAwarenessAttributes); } private void setForcedAwarenessAttributes(Settings forceSettings) { @@ -150,7 +151,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { if (awarenessAttributes.length == 0) { - return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled"); + return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled"); } IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); @@ -158,7 +159,7 @@ public class AwarenessAllocationDecider extends AllocationDecider { for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute if (!node.node().attributes().containsKey(awarenessAttribute)) { - return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute); + return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute); } // build attr_value -> nodes map @@ -180,7 +181,8 @@ public class AwarenessAllocationDecider extends AllocationDecider { String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); if (!node.nodeId().equals(nodeId)) { // we work on different nodes, move counts around - shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), 0, -1); + shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), + 0, -1); shardPerAttribute.addTo(node.node().attributes().get(awarenessAttribute), 1); } } else { @@ -215,8 +217,15 @@ public class AwarenessAllocationDecider extends AllocationDecider { // if we are above with leftover, then we know we are not good, even with mod if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) { return allocation.decision(Decision.NO, NAME, - "too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]", - awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute); + "there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " + + " and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " + + "number of shards per attribute [%d] plus leftover [%d]", + awarenessAttribute, + shardCount, + numberOfAttributes, + currentNodeCount, + requiredCountPerAttribute, + leftoverPerAttribute); } // all is well, we are below or same as average if (currentNodeCount <= requiredCountPerAttribute) { @@ -224,6 +233,6 @@ public class AwarenessAllocationDecider extends AllocationDecider { } } - return allocation.decision(Decision.YES, NAME, "node meets awareness requirements"); + return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java index 84e974aceb0..740c99016db 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java @@ -78,7 +78,8 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { } else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) { return ClusterRebalanceType.INDICES_ALL_ACTIVE; } - throw new IllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString); + throw new IllegalArgumentException("Illegal value for " + + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString); } } @@ -90,10 +91,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { try { type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings); } catch (IllegalStateException e) { - logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings)); + logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings)); type = ClusterRebalanceType.INDICES_ALL_ACTIVE; } - logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), type.toString().toLowerCase(Locale.ROOT)); + logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), + type.toString().toLowerCase(Locale.ROOT)); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType); } @@ -112,11 +116,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) { // check if there are unassigned primaries. if ( allocation.routingNodes().hasUnassignedPrimaries() ) { - return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has unassigned primary shards and rebalance type is set to [%s]", type); } // check if there are initializing primaries that don't have a relocatingNodeId entry. if ( allocation.routingNodes().hasInactivePrimaries() ) { - return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has inactive primary shards and rebalance type is set to [%s]", type); } return allocation.decision(Decision.YES, NAME, "all primary shards are active"); @@ -124,15 +130,17 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider { if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) { // check if there are unassigned shards. if (allocation.routingNodes().hasUnassignedShards() ) { - return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has unassigned shards and rebalance type is set to [%s]", type); } // in case all indices are assigned, are there initializing shards which // are not relocating? if ( allocation.routingNodes().hasInactiveShards() ) { - return allocation.decision(Decision.NO, NAME, "cluster has inactive shards"); + return allocation.decision(Decision.NO, NAME, + "the cluster has inactive shards and rebalance type is set to [%s]", type); } } // type == Type.ALWAYS - return allocation.decision(Decision.YES, NAME, "all shards are active"); + return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index fe6bf918dc2..2c46f7bd549 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -53,7 +53,8 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { super(settings); this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings); logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + this::setClusterConcurrentRebalance); } private void setClusterConcurrentRebalance(int concurrentRebalance) { @@ -63,12 +64,16 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { if (clusterConcurrentRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed"); + return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } - if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) { - return allocation.decision(Decision.NO, NAME, "too many concurrent rebalances [%d], limit: [%d]", - allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance); + int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); + if (relocatingShards >= clusterConcurrentRebalance) { + return allocation.decision(Decision.NO, NAME, + "too many shards are concurrently rebalancing [%d], limit: [%d]", + relocatingShards, clusterConcurrentRebalance); } - return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance); + return allocation.decision(Decision.YES, NAME, + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + clusterConcurrentRebalance, relocatingShards); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java index 02fc2fef948..ebf92302902 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; @@ -145,6 +146,11 @@ public abstract class Decision implements ToXContent { public abstract String label(); + /** + * Return the list of all decisions that make up this decision + */ + public abstract List getDecisions(); + /** * Simple class representing a single decision */ @@ -191,6 +197,11 @@ public abstract class Decision implements ToXContent { return this.label; } + @Override + public List getDecisions() { + return Collections.singletonList(this); + } + /** * Returns the explanation string, fully formatted. Only formats the string once */ @@ -202,11 +213,35 @@ public abstract class Decision implements ToXContent { } @Override - public String toString() { - if (explanation == null) { - return type + "()"; + public boolean equals(Object object) { + if (this == object) { + return true; } - return type + "(" + getExplanation() + ")"; + + if (object == null || getClass() != object.getClass()) { + return false; + } + + Decision.Single s = (Decision.Single) object; + return this.type == s.type && + this.label.equals(s.label) && + this.getExplanation().equals(s.getExplanation()); + } + + @Override + public int hashCode() { + int result = this.type.hashCode(); + result = 31 * result + this.label.hashCode(); + result = 31 * result + this.getExplanation().hashCode(); + return result; + } + + @Override + public String toString() { + if (explanationString != null || explanation != null) { + return type + "(" + getExplanation() + ")"; + } + return type + "()"; } @Override @@ -258,6 +293,31 @@ public abstract class Decision implements ToXContent { return null; } + @Override + public List getDecisions() { + return Collections.unmodifiableList(this.decisions); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + final Decision.Multi m = (Decision.Multi) object; + + return this.decisions.equals(m.decisions); + } + + @Override + public int hashCode() { + return 31 * decisions.hashCode(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index e2124558f2d..890bbd3c31d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -164,7 +164,8 @@ public class DiskThresholdDecider extends AllocationDecider { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; } else { - logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute", + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", node, DiskThresholdDecider.this.rerouteInterval); } nodeHasPassedWatermark.add(node); @@ -183,7 +184,8 @@ public class DiskThresholdDecider extends AllocationDecider { explanation = "one or more nodes has gone under the high or low watermark"; nodeHasPassedWatermark.remove(node); } else { - logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute", + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", node, DiskThresholdDecider.this.rerouteInterval); } } @@ -238,13 +240,15 @@ public class DiskThresholdDecider extends AllocationDecider { private void setLowWatermark(String lowWatermark) { // Watermark is expressed in terms of used data, but we need "free" data watermark this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark); - this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); } private void setHighWatermark(String highWatermark) { // Watermark is expressed in terms of used data, but we need "free" data watermark this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark); - this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); } // For Testing @@ -299,7 +303,8 @@ public class DiskThresholdDecider extends AllocationDecider { * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * of all shards */ - public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) { + public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, + boolean subtractShardsMovingAway, String dataPath) { long totalSize = 0; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { String actualPath = clusterInfo.getDataPath(routing); @@ -353,7 +358,8 @@ public class DiskThresholdDecider extends AllocationDecider { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation", freeBytesThresholdLow, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "the node is above the low watermark and has less than required [%s] free, free: [%s]", freeBytesThresholdLow, new ByteSizeValue(freeBytes)); } else if (freeBytes > freeBytesThresholdHigh.bytes()) { // Allow the shard to be allocated because it is primary that @@ -363,7 +369,8 @@ public class DiskThresholdDecider extends AllocationDecider { "but allowing allocation because primary has never been allocated", freeBytesThresholdLow, freeBytes, node.nodeId()); } - return allocation.decision(Decision.YES, NAME, "primary has never been allocated before"); + return allocation.decision(Decision.YES, NAME, + "the node is above the low watermark, but this primary shard has never been allocated before"); } else { // Even though the primary has never been allocated, the node is // above the high watermark, so don't allow allocating the shard @@ -372,7 +379,9 @@ public class DiskThresholdDecider extends AllocationDecider { "preventing allocation even though primary has never been allocated", freeBytesThresholdHigh, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "the node is above the high watermark even though this shard has never been allocated " + + "and has less than required [%s] free on node, free: [%s]", freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); } } @@ -386,7 +395,8 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(usedDiskThresholdLow, "%"), Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdLow, freeDiskPercentage); } else if (freeDiskPercentage > freeDiskThresholdHigh) { // Allow the shard to be allocated because it is primary that @@ -397,7 +407,8 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(usedDiskThresholdLow, "%"), Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.YES, NAME, "primary has never been allocated before"); + return allocation.decision(Decision.YES, NAME, + "the node is above the low watermark, but this primary shard has never been allocated before"); } else { // Even though the primary has never been allocated, the node is // above the high watermark, so don't allow allocating the shard @@ -407,7 +418,9 @@ public class DiskThresholdDecider extends AllocationDecider { Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "the node is above the high watermark even though this shard has never been allocated " + + "and has more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdHigh, freeDiskPercentage); } } @@ -417,19 +430,29 @@ public class DiskThresholdDecider extends AllocationDecider { double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) { - logger.warn("after allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation", + logger.warn("after allocating, node [{}] would have less than the required " + + "{} free bytes threshold ({} bytes free), preventing allocation", node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard); - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "after allocating the shard to this node, it would be above the high watermark " + + "and have less than required [%s] free, free: [%s]", freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard)); } if (freeSpaceAfterShard < freeDiskThresholdHigh) { - logger.warn("after allocating, node [{}] would have more than the allowed {} free disk threshold ({} free), preventing allocation", + logger.warn("after allocating, node [{}] would have more than the allowed " + + "{} free disk threshold ({} free), preventing allocation", node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%")); - return allocation.decision(Decision.NO, NAME, "after allocation more than allowed [%s%%] used disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "after allocating the shard to this node, it would be above the high watermark " + + "and have more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdLow, freeSpaceAfterShard); } - return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes)); + return allocation.decision(Decision.YES, NAME, + "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]", + new ByteSizeValue(freeBytes), + new ByteSizeValue(shardSize), + new ByteSizeValue(freeBytesAfterShard)); } @Override @@ -453,14 +476,17 @@ public class DiskThresholdDecider extends AllocationDecider { logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes); } if (dataPath == null || usage.getPath().equals(dataPath) == false) { - return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk"); + return allocation.decision(Decision.YES, NAME, + "this shard is not allocated on the most utilized disk and can remain"); } if (freeBytes < freeBytesThresholdHigh.bytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", freeBytesThresholdHigh, freeBytes, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]", + return allocation.decision(Decision.NO, NAME, + "after allocating this shard this node would be above the high watermark " + + "and there would be less than required [%s] free on node, free: [%s]", freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); } if (freeDiskPercentage < freeDiskThresholdHigh) { @@ -468,11 +494,14 @@ public class DiskThresholdDecider extends AllocationDecider { logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain", freeDiskThresholdHigh, freeDiskPercentage, node.nodeId()); } - return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]", + return allocation.decision(Decision.NO, NAME, + "after allocating this shard this node would be above the high watermark " + + "and there would be less than required [%s%%] free disk on node, free: [%s%%]", freeDiskThresholdHigh, freeDiskPercentage); } - return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes)); + return allocation.decision(Decision.YES, NAME, + "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap usages) { @@ -543,7 +572,8 @@ public class DiskThresholdDecider extends AllocationDecider { try { return RatioValue.parseRatioValue(watermark).getAsPercent(); } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately return 100.0; } } @@ -556,7 +586,8 @@ public class DiskThresholdDecider extends AllocationDecider { try { return ByteSizeValue.parseBytesSizeValue(watermark, settingName); } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately return ByteSizeValue.parseBytesSizeValue("0b", settingName); } } @@ -583,7 +614,7 @@ public class DiskThresholdDecider extends AllocationDecider { private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap usages) { // Always allow allocation if the decider is disabled if (!enabled) { - return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled"); + return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled"); } // Allow allocation regardless if only a single data node is available @@ -591,7 +622,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("only a single data node is present, allowing allocation"); } - return allocation.decision(Decision.YES, NAME, "only a single data node is present"); + return allocation.decision(Decision.YES, NAME, "there is only a single data node present"); } // Fail open there is no info available @@ -600,7 +631,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("cluster info unavailable for disk threshold decider, allowing allocation."); } - return allocation.decision(Decision.YES, NAME, "cluster info unavailable"); + return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable"); } // Fail open if there are no disk usages available @@ -608,7 +639,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isTraceEnabled()) { logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation"); } - return allocation.decision(Decision.YES, NAME, "disk usages unavailable"); + return allocation.decision(Decision.YES, NAME, "disk usages are unavailable"); } return null; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java index 0b69ba2a19e..38a2a39fc7c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java @@ -32,8 +32,9 @@ import org.elasticsearch.common.settings.Settings; import java.util.Locale; /** - * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / - * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}. + * This allocation decider allows shard allocations / rebalancing via the cluster wide settings + * {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting + * {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}. * The per index settings overrides the cluster wide setting. * *

@@ -98,7 +99,7 @@ public class EnableAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (allocation.ignoreDisable()) { - return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored"); + return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation"); } final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); @@ -133,7 +134,7 @@ public class EnableAllocationDecider extends AllocationDecider { @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { if (allocation.ignoreDisable()) { - return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored"); + return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation"); } Settings indexSettings = allocation.routingNodes().metaData().getIndexSafe(shardRouting.index()).getSettings(); @@ -167,7 +168,8 @@ public class EnableAllocationDecider extends AllocationDecider { /** * Allocation values or rather their string representation to be used used with - * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} + * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / + * {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} * via cluster / index settings. */ public enum Allocation { @@ -193,7 +195,8 @@ public class EnableAllocationDecider extends AllocationDecider { /** * Rebalance values or rather their string representation to be used used with - * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING} + * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / + * {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING} * via cluster / index settings. */ public enum Rebalance { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index d1aa0d8b583..eb59c261214 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -50,11 +50,14 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR; * would disallow the allocation. Filters are applied in the following order: *

    *
  1. required - filters required allocations. - * If any required filters are set the allocation is denied if the index is not in the set of required to allocate on the filtered node
  2. + * If any required filters are set the allocation is denied if the index is not in the set of required to allocate + * on the filtered node *
  3. include - filters "allowed" allocations. - * If any include filters are set the allocation is denied if the index is not in the set of include filters for the filtered node
  4. + * If any include filters are set the allocation is denied if the index is not in the set of include filters for + * the filtered node *
  5. exclude - filters "prohibited" allocations. - * If any exclude filters are set the allocation is denied if the index is in the set of exclude filters for the filtered node
  6. + * If any exclude filters are set the allocation is denied if the index is in the set of exclude filters for the + * filtered node *
*/ public class FilterAllocationDecider extends AllocationDecider { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index eb9c5cf8ee0..95540d89a6f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -52,7 +52,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider { return isVersionCompatible(shardRouting.restoreSource(), node, allocation); } else { // fresh primary, we can allocate wherever - return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere"); + return allocation.decision(Decision.YES, NAME, "the primary shard is new and can be allocated anywhere"); } } else { // relocating primary, only migrate to newer host @@ -70,16 +70,17 @@ public class NodeVersionAllocationDecider extends AllocationDecider { } } - private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) { + private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, + RoutingAllocation allocation) { final RoutingNode source = routingNodes.node(sourceNodeId); if (target.node().version().onOrAfter(source.node().version())) { /* we can allocate if we can recover from a node that is younger or on the same version * if the primary is already running on a newer version that won't work due to possible * differences in the lucene index format etc.*/ - return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]", + return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]", target.node().version(), source.node().version()); } else { - return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]", + return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]", target.node().version(), source.node().version()); } } @@ -87,10 +88,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider { private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) { if (target.node().version().onOrAfter(restoreSource.version())) { /* we can allocate if we can restore from a snapshot that is older or on the same version */ - return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]", + return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]", target.node().version(), restoreSource.version()); } else { - return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]", + return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]", target.node().version(), restoreSource.version()); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java index 039abd8749c..869c6313069 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java @@ -41,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider // its ok to check for active here, since in relocation, a shard is split into two in routing // nodes, once relocating, and one initializing if (!allocation.routingNodes().allReplicasActive(shardRouting)) { - return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster"); + return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster"); } - return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster"); + return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java index 1c5a3f93b7b..59ab67c309e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java @@ -45,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide @Override public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { if (shardRouting.primary()) { - return allocation.decision(Decision.YES, NAME, "shard is primary"); + return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated"); } ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting); if (primary == null) { - return allocation.decision(Decision.NO, NAME, "primary shard is not yet active"); + return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); } - return allocation.decision(Decision.YES, NAME, "primary is already active"); + return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java index 44eb7d0e2f9..f0b4fdf35c6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java @@ -61,7 +61,8 @@ public class SameShardAllocationDecider extends AllocationDecider { Iterable assignedShards = allocation.routingNodes().assignedShards(shardRouting); for (ShardRouting assignedShard : assignedShards) { if (node.nodeId().equals(assignedShard.currentNodeId())) { - return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId()); + return allocation.decision(Decision.NO, NAME, + "the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId()); } } if (sameHost) { @@ -85,7 +86,7 @@ public class SameShardAllocationDecider extends AllocationDecider { for (ShardRouting assignedShard : assignedShards) { if (checkNode.nodeId().equals(assignedShard.currentNodeId())) { return allocation.decision(Decision.NO, NAME, - "shard cannot be allocated on same host [%s] it already exists on", node.nodeId()); + "shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId()); } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index 04247525f1d..eb256516353 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -93,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (indexShardLimit <= 0 && clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0", indexShardLimit, clusterShardLimit); } @@ -110,14 +110,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { } } if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, + "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]", shardRouting.index(), indexShardCount, indexShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, + "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node", indexShardLimit, clusterShardLimit); } @@ -130,7 +132,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (indexShardLimit <= 0 && clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0", indexShardLimit, clusterShardLimit); } @@ -149,14 +151,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { // Subtle difference between the `canAllocate` and `canRemain` is that // this checks > while canAllocate checks >= if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } if (indexShardLimit > 0 && indexShardCount > indexShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, + "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]", shardRouting.index(), indexShardCount, indexShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, + "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node", indexShardLimit, clusterShardLimit); } @@ -168,7 +172,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { final int clusterShardLimit = this.clusterShardLimit; if (clusterShardLimit <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0", + return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [cluster: %d] <= 0", clusterShardLimit); } @@ -181,10 +185,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { nodeShardCount++; } if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", nodeShardCount, clusterShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node", + return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node", clusterShardLimit); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index d656afc8036..54cfb6407da 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -54,7 +54,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { } /** - * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from given settings + * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from + * given settings * * @param settings {@link org.elasticsearch.common.settings.Settings} to use */ @@ -66,7 +67,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { public SnapshotInProgressAllocationDecider(Settings settings, ClusterSettings clusterSettings) { super(settings); enableRelocation = CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, this::setEnableRelocation); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, + this::setEnableRelocation); } private void setEnableRelocation(boolean enableRelocation) { @@ -104,14 +106,18 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId()); - if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) { - logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId()); + if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && + shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) { + if (logger.isTraceEnabled()) { + logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", + shardRouting.shardId(), shardSnapshotStatus.nodeId()); + } return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]", shardRouting.shardId(), shardSnapshotStatus.nodeId()); } } } - return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled"); + return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled"); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index ca6b312da4c..6eb44351c7a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -84,11 +84,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider { concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings); concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, + this::setPrimariesInitialRecoveries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, + this::setConcurrentIncomingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, + this::setConcurrentOutgoingRecoverries); - logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); + logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " + + "node_initial_primaries_recoveries [{}]", + concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); } private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) { @@ -118,7 +123,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { } } if (primariesInRecovery >= primariesInitialRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]", primariesInRecovery, primariesInitialRecoveries); } else { return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries); @@ -137,13 +142,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider { int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId()); int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); if (currentOutRecoveries >= concurrentOutgoingRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]", currentOutRecoveries, concurrentOutgoingRecoveries); } else if (currentInRecoveries >= concurrentIncomingRecoveries) { - return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]", + return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]", currentInRecoveries, concurrentIncomingRecoveries); } else { - return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries); + return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]", + currentOutRecoveries, + concurrentOutgoingRecoveries, + currentInRecoveries, + concurrentIncomingRecoveries); } } } diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 712bdbe99ab..4c32abe8156 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.netty.NettyHttpServerTransport; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.action.admin.cluster.allocation.RestClusterAllocationExplainAction; import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction; import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; @@ -171,6 +172,7 @@ public class NetworkModule extends AbstractModule { RestNodesInfoAction.class, RestNodesStatsAction.class, RestNodesHotThreadsAction.class, + RestClusterAllocationExplainAction.class, RestClusterStatsAction.class, RestClusterStateAction.class, RestClusterHealthAction.class, diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java new file mode 100644 index 00000000000..06ba2a9be87 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.allocation; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestBuilderListener; + +import java.io.IOException; + +/** + * Class handling cluster allocation explanation at the REST level + */ +public class RestClusterAllocationExplainAction extends BaseRestHandler { + + @Inject + public RestClusterAllocationExplainAction(Settings settings, RestController controller, Client client) { + super(settings, client); + controller.registerHandler(RestRequest.Method.GET, "/_cluster/allocation/explain", this); + controller.registerHandler(RestRequest.Method.POST, "/_cluster/allocation/explain", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { + ClusterAllocationExplainRequest req; + if (RestActions.hasBodyContent(request) == false) { + // Empty request signals "explain the first unassigned shard you find" + req = new ClusterAllocationExplainRequest(); + } else { + BytesReference content = RestActions.getRestContent(request); + try (XContentParser parser = XContentFactory.xContent(content).createParser(content)) { + req = ClusterAllocationExplainRequest.parse(parser); + } catch (IOException e) { + logger.debug("failed to parse allocation explain request", e); + channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e))); + return; + } + } + + try { + req.includeYesDecisions(request.paramAsBoolean("include_yes_decisions", false)); + client.admin().cluster().allocationExplain(req, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(ClusterAllocationExplainResponse response, XContentBuilder builder) throws Exception { + response.getExplanation().toXContent(builder, ToXContent.EMPTY_PARAMS); + return new BytesRestResponse(RestStatus.OK, builder); + } + }); + } catch (Exception e) { + logger.error("failed to explain allocation", e); + channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e))); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainTests.java new file mode 100644 index 00000000000..95415ecdbd1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainTests.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.test.ESSingleNodeTestCase; + + +/** + * Tests for the cluster allocation explanation + */ +public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase { + + public void testShardExplain() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings("index.number_of_shards", 1, "index.number_of_replicas", 1).get(); + client().admin().cluster().health(Requests.clusterHealthRequest("test").waitForYellowStatus()).get(); + ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain() + .setIndex("test").setShard(0).setPrimary(false).get(); + + ClusterAllocationExplanation cae = resp.getExplanation(); + assertNotNull("should always have an explanation", cae); + assertEquals("test", cae.getShard().getIndexName()); + assertEquals(0, cae.getShard().getId()); + assertEquals(false, cae.isPrimary()); + assertNull(cae.getAssignedNodeId()); + assertNotNull(cae.getUnassignedInfo()); + Decision d = cae.getNodeDecisions().values().iterator().next(); + assertNotNull("should have a decision", d); + assertEquals(Decision.Type.NO, d.type()); + assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id")); + assertTrue(d instanceof Decision.Multi); + Decision.Multi md = (Decision.Multi) d; + Decision ssd = md.getDecisions().get(0); + assertEquals(Decision.Type.NO, ssd.type()); + assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id")); + Float weight = cae.getNodeWeights().values().iterator().next(); + assertNotNull("should have a weight", weight); + + resp = client().admin().cluster().prepareAllocationExplain().setIndex("test").setShard(0).setPrimary(true).get(); + + cae = resp.getExplanation(); + assertNotNull("should always have an explanation", cae); + assertEquals("test", cae.getShard().getIndexName()); + assertEquals(0, cae.getShard().getId()); + assertEquals(true, cae.isPrimary()); + assertNotNull("shard should have assigned node id", cae.getAssignedNodeId()); + assertNull("assigned shard should not have unassigned info", cae.getUnassignedInfo()); + d = cae.getNodeDecisions().values().iterator().next(); + assertNotNull("should have a decision", d); + assertEquals(Decision.Type.NO, d.type()); + assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id")); + assertTrue(d instanceof Decision.Multi); + md = (Decision.Multi) d; + ssd = md.getDecisions().get(0); + assertEquals(Decision.Type.NO, ssd.type()); + assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id")); + weight = cae.getNodeWeights().values().iterator().next(); + assertNotNull("should have a weight", weight); + + resp = client().admin().cluster().prepareAllocationExplain().useAnyUnassignedShard().get(); + cae = resp.getExplanation(); + assertNotNull("should always have an explanation", cae); + assertEquals("test", cae.getShard().getIndexName()); + assertEquals(0, cae.getShard().getId()); + assertEquals(false, cae.isPrimary()); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java new file mode 100644 index 00000000000..060fb73fbf8 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Tests for the cluster allocation explanation + */ +public final class ClusterAllocationExplanationTests extends ESTestCase { + + public void testDecisionEquality() { + Decision.Multi d = new Decision.Multi(); + Decision.Multi d2 = new Decision.Multi(); + d.add(Decision.single(Decision.Type.NO, "no label", "because I said no")); + d.add(Decision.single(Decision.Type.YES, "yes label", "yes please")); + d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec")); + d2.add(Decision.single(Decision.Type.NO, "no label", "because I said no")); + d2.add(Decision.single(Decision.Type.YES, "yes label", "yes please")); + d2.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec")); + assertEquals(d, d2); + } + + public void testExplanationSerialization() throws Exception { + ShardId shard = new ShardId("test", "uuid", 0); + Map nodeToDecisions = new HashMap<>(); + Map nodeToWeight = new HashMap<>(); + for (int i = randomIntBetween(2, 5); i > 0; i--) { + DiscoveryNode dn = new DiscoveryNode("node-" + i, DummyTransportAddress.INSTANCE, Version.CURRENT); + Decision.Multi d = new Decision.Multi(); + d.add(Decision.single(Decision.Type.NO, "no label", "because I said no")); + d.add(Decision.single(Decision.Type.YES, "yes label", "yes please")); + d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec")); + nodeToDecisions.put(dn, d); + nodeToWeight.put(dn, randomFloat()); + } + + ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null, + nodeToDecisions, nodeToWeight); + BytesStreamOutput out = new BytesStreamOutput(); + cae.writeTo(out); + StreamInput in = StreamInput.wrap(out.bytes()); + ClusterAllocationExplanation cae2 = new ClusterAllocationExplanation(in); + assertEquals(shard, cae2.getShard()); + assertTrue(cae2.isPrimary()); + assertTrue(cae2.isAssigned()); + assertEquals("assignedNode", cae2.getAssignedNodeId()); + assertNull(cae2.getUnassignedInfo()); + for (Map.Entry entry : cae2.getNodeDecisions().entrySet()) { + assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue()); + } + assertEquals(nodeToWeight, cae2.getNodeWeights()); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 4a930bc9c28..016f70f51bd 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.metadata.IndexTemplateFilter; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -39,6 +40,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import java.util.HashMap; +import java.util.Map; public class ClusterModuleTests extends ModuleTestCase { public static class FakeAllocationDecider extends AllocationDecider { @@ -52,6 +55,11 @@ public class ClusterModuleTests extends ModuleTestCase { public boolean allocate(RoutingAllocation allocation) { return false; } + + @Override + public Map weighShard(RoutingAllocation allocation, ShardRouting shard) { + return new HashMap<>(); + } } static class FakeIndexTemplateFilter implements IndexTemplateFilter { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 56a66b52d6f..f1495bb5e7b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; @@ -44,6 +45,8 @@ import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.hamcrest.Matchers; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -313,6 +316,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { + public Map weighShard(RoutingAllocation allocation, ShardRouting shard) { + return new HashMap(); + } /* * // this allocator tries to rebuild this scenario where a rebalance is * // triggered solely by the primary overload on node [1] where a shard diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 928756fec01..260a33780a7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -794,7 +795,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { fail("should not have been able to reroute the shard"); } catch (IllegalArgumentException e) { assertThat("can't allocated because there isn't enough room: " + e.getMessage(), - e.getMessage().contains("more than allowed [70.0%] used disk on node, free: [26.0%]"), equalTo(true)); + e.getMessage(), + containsString("the node is above the low watermark and has more than allowed [70.0%] used disk, free: [26.0%]")); } } diff --git a/docs/reference/cluster.asciidoc b/docs/reference/cluster.asciidoc index 4c823119daf..d294d1092b9 100644 --- a/docs/reference/cluster.asciidoc +++ b/docs/reference/cluster.asciidoc @@ -46,3 +46,5 @@ include::cluster/nodes-stats.asciidoc[] include::cluster/nodes-info.asciidoc[] include::cluster/nodes-hot-threads.asciidoc[] + +include::cluster/allocation-explain.asciidoc[] diff --git a/docs/reference/cluster/allocation-explain.asciidoc b/docs/reference/cluster/allocation-explain.asciidoc new file mode 100644 index 00000000000..dcca80dd61f --- /dev/null +++ b/docs/reference/cluster/allocation-explain.asciidoc @@ -0,0 +1,159 @@ +[[cluster-allocation-explain]] +== Cluster Allocation Explain API + +The cluster allocation explanation API is designed to assist in answering the +question "why is this shard unassigned?". To explain the allocation (on +unassigned state) of a shard, issue a request like: + +[source,js] +-------------------------------------------------- +$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain' -d'{ + "index": "myindex", + "shard": 0, + "primary": false +}' +-------------------------------------------------- + +Specify the `index` and `shard` id of the shard you would like an explanation +for, as well as the `primary` flag to indicate whether to explain a primary or +replica shard. + +The response looks like: + +[source,js] +-------------------------------------------------- +{ + "shard" : { + "index" : "myindex", + "index_uuid" : "KnW0-zELRs6PK84l0r38ZA", + "id" : 0, + "primary" : false + }, + "assigned" : false, <1> + "unassigned_info" : { + "reason" : "INDEX_CREATED", <2> + "at" : "2016-03-22T20:04:23.620Z" + }, + "nodes" : { <3> + "V-Spi0AyRZ6ZvKbaI3691w" : { + "node_name" : "node1", + "node_attributes" : { <4> + "bar" : "baz" + }, + "final_decision" : "NO", <5> + "weight" : 0.06666675, <6> + "decisions" : [ { <7> + "decider" : "filter", + "decision" : "NO", + "explanation" : "node does not match index include filters [foo:\"bar\"]" + } ] + }, + "Qc6VL8c5RWaw1qXZ0Rg57g" : { + "node_name" : "node2", + "node_attributes" : { + "bar" : "baz", + "foo" : "bar" + }, + "final_decision" : "NO", + "weight" : -1.3833332, + "decisions" : [ { + "decider" : "same_shard", + "decision" : "NO", + "explanation" : "the shard cannot be allocated on the same node id [Qc6VL8c5RWaw1qXZ0Rg57g] on which it already exists" + } ] + }, + "PzdyMZGXQdGhqTJHF_hGgA" : { + "node_name" : "node3", + "node_attributes" : { }, + "final_decision" : "NO", + "weight" : 2.3166666, + "decisions" : [ { + "decider" : "filter", + "decision" : "NO", + "explanation" : "node does not match index include filters [foo:\"bar\"]" + } ] + } + } +} +-------------------------------------------------- +<1> Whether the shard is assigned or unassigned +<2> Reason for the shard originally becoming unassigned +<3> List of node decisions about the shard +<4> User-added attributes the node has +<5> Final decision for whether the shard is allowed to be allocated to this node +<6> Weight for how much the allocator would like to allocate the shard to this node +<7> List of decisions factoring into final decision + +For a shard that is already assigned, the output looks similar to: + +[source,js] +-------------------------------------------------- +{ + "shard" : { + "index" : "only-foo", + "index_uuid" : "KnW0-zELRs6PK84l0r38ZA", + "id" : 0, + "primary" : true + }, + "assigned" : true, + "assigned_node_id" : "Qc6VL8c5RWaw1qXZ0Rg57g", <1> + "nodes" : { + "V-Spi0AyRZ6ZvKbaI3691w" : { + "node_name" : "Susan Storm", + "node_attributes" : { + "bar" : "baz" + }, + "final_decision" : "NO", + "weight" : 1.4499999, + "decisions" : [ { + "decider" : "filter", + "decision" : "NO", + "explanation" : "node does not match index include filters [foo:\"bar\"]" + } ] + }, + "Qc6VL8c5RWaw1qXZ0Rg57g" : { + "node_name" : "Slipstream", + "node_attributes" : { + "bar" : "baz", + "foo" : "bar" + }, + "final_decision" : "CURRENTLY_ASSIGNED", <2> + "weight" : 0.0, + "decisions" : [ { + "decider" : "same_shard", + "decision" : "NO", + "explanation" : "the shard cannot be allocated on the same node id [Qc6VL8c5RWaw1qXZ0Rg57g] on which it already exists" + } ] + }, + "PzdyMZGXQdGhqTJHF_hGgA" : { + "node_name" : "The Symbiote", + "node_attributes" : { }, + "final_decision" : "NO", + "weight" : 3.6999998, + "decisions" : [ { + "decider" : "filter", + "decision" : "NO", + "explanation" : "node does not match index include filters [foo:\"bar\"]" + } ] + } + } +} +-------------------------------------------------- +<1> Node the shard is currently assigned to +<2> The decision is "CURRENTLY_ASSIGNED" because the shard is currently assigned to this node + +You can also have Elasticsearch explain the allocation of the first unassigned +shard it finds by sending an empty body, such as: + +[source,js] +-------------------------------------------------- +$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain' +-------------------------------------------------- + +And if you would like to include all decisions that were factored into the final +decision, the `include_yes_decisions` parameter will return all decisions: + +[source,js] +-------------------------------------------------- +$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain?include_yes_decisions=true' +-------------------------------------------------- diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.allocation_explain.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.allocation_explain.json new file mode 100644 index 00000000000..505c163497e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.allocation_explain.json @@ -0,0 +1,20 @@ +{ + "cluster.allocation_explain": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-allocation-explain.html", + "methods": ["GET", "POST"], + "url": { + "path": "/_cluster/allocation/explain", + "paths": ["/_cluster/allocation/explain"], + "parts": {}, + "params": { + "include_yes_decisions": { + "type": "boolean", + "description": "Return 'YES' decisions in explanation (default: false)" + } + } + }, + "body": { + "description": "The index, shard, and primary flag to explain. Empty means 'explain the first unassigned shard'" + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.allocation_explain/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.allocation_explain/10_basic.yaml new file mode 100644 index 00000000000..0163ffae3ef --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cluster.allocation_explain/10_basic.yaml @@ -0,0 +1,76 @@ +--- +"cluster shard allocation explanation test": + - do: + # there aren't any unassigned shards to explain + catch: /unable to find any shards to explain/ + cluster.allocation_explain: {} + + - do: + indices.create: + index: test + + - do: + cluster.health: + wait_for_status: yellow + + - do: + cluster.state: + metric: [ master_node ] + + - set: {master_node: node_id} + + # This relies on there only being a single node in the test cluster, which + # is currently true, but if this changes in the future this test will need + # to be changed + - do: + cluster.allocation_explain: + body: { "index": "test", "shard": 0, "primary": true } + + - match: { assigned: true } + # - match: { assigned_node_id: $node_id } + - is_true: assigned_node_id + - match: { shard.index: "test" } + - match: { shard.id: 0 } + - match: { shard.primary: true } + # unfortunately can't test these because they break with multi-node backwords + # compat REST tests + # - is_true: nodes.$node_id.node_name + # - match: { nodes.$node_id.node_attributes.testattr: "test" } + # - match: { nodes.$node_id.node_attributes.portsfile: "true" } + # - match: { nodes.$node_id.final_decision: "CURRENTLY_ASSIGNED" } + # - match: { nodes.$node_id.weight: 0.0 } + # - match: { nodes.$node_id.decisions.0.decider: "same_shard" } + # - match: { nodes.$node_id.decisions.0.decision: "NO" } + +--- +"cluster shard allocation explanation test with empty request": + - do: + indices.create: + index: test + body: { "index.number_of_shards": 1, "index.number_of_replicas": 9 } + + - do: + cluster.health: + wait_for_status: yellow + + - do: + cluster.state: + metric: [ master_node ] + + - set: {master_node: node_id} + + - do: + cluster.allocation_explain: {} + + - match: { assigned: false } + - match: { unassigned_info.reason: "INDEX_CREATED" } + - is_true: unassigned_info.at + - match: { shard.index: "test" } + - match: { shard.id: 0 } + - match: { shard.primary: false } + # - is_true: nodes.$node_id.node_name + # - match: { nodes.$node_id.node_attributes.testattr: "test" } + # - match: { nodes.$node_id.node_attributes.portsfile: "true" } + # - match: { nodes.$node_id.final_decision: "NO" } + # - match: { nodes.$node_id.decisions.0.decider: "same_shard" } + # - match: { nodes.$node_id.decisions.0.decision: "NO" } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/parser/RestTestSuiteParser.java b/test/framework/src/main/java/org/elasticsearch/test/rest/parser/RestTestSuiteParser.java index ba1b99288e3..fa9c5cf099a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/parser/RestTestSuiteParser.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/parser/RestTestSuiteParser.java @@ -73,7 +73,7 @@ public class RestTestSuiteParser implements RestTestFragmentParser