Merge remote-tracking branch 'dakrone/allocation-explain'

This commit is contained in:
Lee Hinman 2016-03-28 16:06:49 -06:00
commit c63cb21745
39 changed files with 1629 additions and 111 deletions

View File

@ -326,17 +326,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]command[/\\]CancelAllocationCommand.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]command[/\\]MoveAllocationCommand.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]AllocationDeciders.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]AwarenessAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]ClusterRebalanceAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]ConcurrentRebalanceAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]DiskThresholdDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]EnableAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]FilterAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]NodeVersionAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]SameShardAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]ShardsLimitAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]SnapshotInProgressAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]ThrottlingAllocationDecider.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]service[/\\]InternalClusterService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]Base64.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]Booleans.java" checks="LineLength" />

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
@ -263,6 +265,7 @@ public class ActionModule extends AbstractModule {
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
registerAction(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Action for explaining shard allocation for a shard in the cluster
*/
public class ClusterAllocationExplainAction extends Action<ClusterAllocationExplainRequest,
ClusterAllocationExplainResponse,
ClusterAllocationExplainRequestBuilder> {
public static final ClusterAllocationExplainAction INSTANCE = new ClusterAllocationExplainAction();
public static final String NAME = "cluster:monitor/allocation/explain";
private ClusterAllocationExplainAction() {
super(NAME);
}
@Override
public ClusterAllocationExplainResponse newResponse() {
return new ClusterAllocationExplainResponse();
}
@Override
public ClusterAllocationExplainRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new ClusterAllocationExplainRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A request to explain the allocation of a shard in the cluster
*/
public class ClusterAllocationExplainRequest extends MasterNodeRequest<ClusterAllocationExplainRequest> {
private String index;
private Integer shard;
private Boolean primary;
private boolean includeYesDecisions = false;
/** Explain the first unassigned shard */
public ClusterAllocationExplainRequest() {
this.index = null;
this.shard = null;
this.primary = null;
}
/**
* Create a new allocation explain request. If {@code primary} is false, the first unassigned replica
* will be picked for explanation. If no replicas are unassigned, the first assigned replica will
* be explained.
*/
public ClusterAllocationExplainRequest(String index, int shard, boolean primary) {
this.index = index;
this.shard = shard;
this.primary = primary;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (this.useAnyUnassignedShard() == false) {
if (this.index == null) {
validationException = addValidationError("index must be specified", validationException);
}
if (this.shard == null) {
validationException = addValidationError("shard must be specified", validationException);
}
if (this.primary == null) {
validationException = addValidationError("primary must be specified", validationException);
}
}
return validationException;
}
/**
* Returns {@code true} iff the first unassigned shard is to be used
*/
public boolean useAnyUnassignedShard() {
return this.index == null && this.shard == null && this.primary == null;
}
public ClusterAllocationExplainRequest setIndex(String index) {
this.index = index;
return this;
}
@Nullable
public String getIndex() {
return this.index;
}
public ClusterAllocationExplainRequest setShard(Integer shard) {
this.shard = shard;
return this;
}
@Nullable
public int getShard() {
return this.shard;
}
public ClusterAllocationExplainRequest setPrimary(Boolean primary) {
this.primary = primary;
return this;
}
@Nullable
public boolean isPrimary() {
return this.primary;
}
public void includeYesDecisions(boolean includeYesDecisions) {
this.includeYesDecisions = includeYesDecisions;
}
/** Returns true if all decisions should be included. Otherwise only "NO" and "THROTTLE" decisions are returned */
public boolean includeYesDecisions() {
return this.includeYesDecisions;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("ClusterAllocationExplainRequest[");
if (this.useAnyUnassignedShard()) {
sb.append("useAnyUnassignedShard=true");
} else {
sb.append("index=").append(index);
sb.append(",shard=").append(shard);
sb.append(",primary?=").append(primary);
}
sb.append(",includeYesDecisions?=").append(includeYesDecisions);
return sb.toString();
}
public static ClusterAllocationExplainRequest parse(XContentParser parser) throws IOException {
String currentFieldName = null;
String index = null;
Integer shard = null;
Boolean primary = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
index = parser.text();
} else if ("shard".equals(currentFieldName)) {
shard = parser.intValue();
} else if ("primary".equals(currentFieldName)) {
primary = parser.booleanValue();
} else {
throw new ElasticsearchParseException("unexpected field [" + currentFieldName + "] in allocation explain request");
}
} else if (token == XContentParser.Token.START_OBJECT) {
// the object was started
continue;
} else {
throw new ElasticsearchParseException("unexpected token [" + token + "] in allocation explain request");
}
}
if (index == null && shard == null && primary == null) {
// If it was an empty body, use the "any unassigned shard" request
return new ClusterAllocationExplainRequest();
} else if (index == null || shard == null || primary == null) {
throw new ElasticsearchParseException("'index', 'shard', and 'primary' must be specified in allocation explain request");
}
return new ClusterAllocationExplainRequest(index, shard, primary);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.index = in.readOptionalString();
this.shard = in.readOptionalVInt();
this.primary = in.readOptionalBoolean();
this.includeYesDecisions = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(index);
out.writeOptionalVInt(shard);
out.writeOptionalBoolean(primary);
out.writeBoolean(includeYesDecisions);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Builder for requests to explain the allocation of a shard in the cluster
*/
public class ClusterAllocationExplainRequestBuilder
extends MasterNodeOperationRequestBuilder<ClusterAllocationExplainRequest,
ClusterAllocationExplainResponse,
ClusterAllocationExplainRequestBuilder> {
public ClusterAllocationExplainRequestBuilder(ElasticsearchClient client, ClusterAllocationExplainAction action) {
super(client, action, new ClusterAllocationExplainRequest());
}
/** The index name to use when finding the shard to explain */
public ClusterAllocationExplainRequestBuilder setIndex(String index) {
request.setIndex(index);
return this;
}
/** The shard number to use when finding the shard to explain */
public ClusterAllocationExplainRequestBuilder setShard(int shard) {
request.setShard(shard);
return this;
}
/** Whether the primary or replica should be explained */
public ClusterAllocationExplainRequestBuilder setPrimary(boolean primary) {
request.setPrimary(primary);
return this;
}
/**
* Signal that the first unassigned shard should be used
*/
public ClusterAllocationExplainRequestBuilder useAnyUnassignedShard() {
request.setIndex(null);
request.setShard(null);
request.setPrimary(null);
return this;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Explanation response for a shard in the cluster
*/
public class ClusterAllocationExplainResponse extends ActionResponse {
private ClusterAllocationExplanation cae;
public ClusterAllocationExplainResponse() {
}
public ClusterAllocationExplainResponse(ClusterAllocationExplanation cae) {
this.cae = cae;
}
/**
* Return the explanation for shard allocation in the cluster
*/
public ClusterAllocationExplanation getExplanation() {
return this.cae;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.cae = new ClusterAllocationExplanation(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
cae.writeTo(out);
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* A {@code ClusterAllocationExplanation} is an explanation of why a shard may or may not be allocated to nodes. It also includes weights
* for where the shard is likely to be assigned. It is an immutable class
*/
public final class ClusterAllocationExplanation implements ToXContent, Writeable<ClusterAllocationExplanation> {
private final ShardId shard;
private final boolean primary;
private final String assignedNodeId;
private final Map<DiscoveryNode, Decision> nodeToDecision;
private final Map<DiscoveryNode, Float> nodeWeights;
private final UnassignedInfo unassignedInfo;
public ClusterAllocationExplanation(StreamInput in) throws IOException {
this.shard = ShardId.readShardId(in);
this.primary = in.readBoolean();
this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
Map<DiscoveryNode, Decision> ntd = null;
int size = in.readVInt();
ntd = new HashMap<>(size);
for (int i = 0; i < size; i++) {
DiscoveryNode dn = DiscoveryNode.readNode(in);
Decision decision = Decision.readFrom(in);
ntd.put(dn, decision);
}
this.nodeToDecision = ntd;
Map<DiscoveryNode, Float> ntw = null;
size = in.readVInt();
ntw = new HashMap<>(size);
for (int i = 0; i < size; i++) {
DiscoveryNode dn = DiscoveryNode.readNode(in);
float weight = in.readFloat();
ntw.put(dn, weight);
}
this.nodeWeights = ntw;
}
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
Map<DiscoveryNode, Float> nodeWeights) {
this.shard = shard;
this.primary = primary;
this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo;
this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights;
}
public ShardId getShard() {
return this.shard;
}
public boolean isPrimary() {
return this.primary;
}
/** Return turn if the shard is assigned to a node */
public boolean isAssigned() {
return this.assignedNodeId != null;
}
/** Return the assigned node id or null if not assigned */
@Nullable
public String getAssignedNodeId() {
return this.assignedNodeId;
}
/** Return the unassigned info for the shard or null if the shard is assigned */
@Nullable
public UnassignedInfo getUnassignedInfo() {
return this.unassignedInfo;
}
/** Return a map of node to decision for shard allocation */
public Map<DiscoveryNode, Decision> getNodeDecisions() {
return this.nodeToDecision;
}
/**
* Return a map of node to balancer "weight" for allocation. Higher weights mean the balancer wants to allocated the shard to that node
* more
*/
public Map<DiscoveryNode, Float> getNodeWeights() {
return this.nodeWeights;
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); {
builder.startObject("shard"); {
builder.field("index", shard.getIndexName());
builder.field("index_uuid", shard.getIndex().getUUID());
builder.field("id", shard.getId());
builder.field("primary", primary);
}
builder.endObject(); // end shard
builder.field("assigned", this.assignedNodeId != null);
// If assigned, show the node id of the node it's assigned to
if (assignedNodeId != null) {
builder.field("assigned_node_id", this.assignedNodeId);
}
// If we have unassigned info, show that
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
}
builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, Float> entry : nodeWeights.entrySet()) {
DiscoveryNode node = entry.getKey();
builder.startObject(node.getId()); {
builder.field("node_name", node.getName());
builder.startObject("node_attributes"); {
for (ObjectObjectCursor<String, String> attrKV : node.attributes()) {
builder.field(attrKV.key, attrKV.value);
}
}
builder.endObject(); // end attributes
Decision d = nodeToDecision.get(node);
if (node.getId().equals(assignedNodeId)) {
builder.field("final_decision", "CURRENTLY_ASSIGNED");
} else {
builder.field("final_decision", d.type().toString());
}
builder.field("weight", entry.getValue());
d.toXContent(builder, params);
}
builder.endObject(); // end node <uuid>
}
builder.endObject(); // end nodes
}
builder.endObject(); // end wrapping object
return builder;
}
@Override
public ClusterAllocationExplanation readFrom(StreamInput in) throws IOException {
return new ClusterAllocationExplanation(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
this.getShard().writeTo(out);
out.writeBoolean(this.isPrimary());
out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo());
Map<DiscoveryNode, Decision> ntd = this.getNodeDecisions();
out.writeVInt(ntd.size());
for (Map.Entry<DiscoveryNode, Decision> entry : ntd.entrySet()) {
entry.getKey().writeTo(out);
Decision.writeTo(entry.getValue(), out);
}
Map<DiscoveryNode, Float> ntw = this.getNodeWeights();
out.writeVInt(ntw.size());
for (Map.Entry<DiscoveryNode, Float> entry : ntw.entrySet()) {
entry.getKey().writeTo(out);
out.writeFloat(entry.getValue());
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingNodes.RoutingNodesIterator;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
* master node in the cluster.
*/
public class TransportClusterAllocationExplainAction
extends TransportMasterNodeAction<ClusterAllocationExplainRequest, ClusterAllocationExplainResponse> {
private final AllocationService allocationService;
private final ClusterInfoService clusterInfoService;
private final AllocationDeciders allocationDeciders;
private final ShardsAllocator shardAllocator;
@Inject
public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService, ClusterInfoService clusterInfoService,
AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator) {
super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterAllocationExplainRequest::new);
this.allocationService = allocationService;
this.clusterInfoService = clusterInfoService;
this.allocationDeciders = allocationDeciders;
this.shardAllocator = shardAllocator;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected ClusterBlockException checkBlock(ClusterAllocationExplainRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected ClusterAllocationExplainResponse newResponse() {
return new ClusterAllocationExplainResponse();
}
/**
* Return the decisions for the given {@code ShardRouting} on the given {@code RoutingNode}. If {@code includeYesDecisions} is not true,
* only non-YES (NO and THROTTLE) decisions are returned.
*/
public static Decision tryShardOnNode(ShardRouting shard, RoutingNode node, RoutingAllocation allocation, boolean includeYesDecisions) {
Decision d = allocation.deciders().canAllocate(shard, node, allocation);
if (includeYesDecisions) {
return d;
} else {
Decision.Multi nonYesDecisions = new Decision.Multi();
List<Decision> decisions = d.getDecisions();
for (Decision decision : decisions) {
if (decision.type() != Decision.Type.YES) {
nonYesDecisions.add(decision);
}
}
return nonYesDecisions;
}
}
/**
* For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code
* includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions.
*/
public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes,
boolean includeYesDecisions, ShardsAllocator shardAllocator) {
// don't short circuit deciders, we want a full explanation
allocation.debugDecision(true);
// get the existing unassigned info if available
UnassignedInfo ui = shard.unassignedInfo();
RoutingNodesIterator iter = routingNodes.nodes();
Map<DiscoveryNode, Decision> nodeToDecision = new HashMap<>();
while (iter.hasNext()) {
RoutingNode node = iter.next();
DiscoveryNode discoNode = node.node();
if (discoNode.isDataNode()) {
Decision d = tryShardOnNode(shard, node, allocation, includeYesDecisions);
nodeToDecision.put(discoNode, d);
}
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
shardAllocator.weighShard(allocation, shard));
}
@Override
protected void masterOperation(final ClusterAllocationExplainRequest request, final ClusterState state,
final ActionListener<ClusterAllocationExplainResponse> listener) {
final RoutingNodes routingNodes = state.getRoutingNodes();
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(),
clusterInfoService.getClusterInfo(), System.nanoTime());
ShardRouting shardRouting = null;
if (request.useAnyUnassignedShard()) {
// If we can use any shard, just pick the first unassigned one (if there are any)
RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator();
if (ui.hasNext()) {
shardRouting = ui.next();
}
} else {
String index = request.getIndex();
int shard = request.getShard();
if (request.isPrimary()) {
// If we're looking for the primary shard, there's only one copy, so pick it directly
shardRouting = allocation.routingTable().shardRoutingTable(index, shard).primaryShard();
} else {
// If looking for a replica, go through all the replica shards
List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards();
if (replicaShardRoutings.size() > 0) {
// Pick the first replica at the very least
shardRouting = replicaShardRoutings.get(0);
// In case there are multiple replicas where some are assigned and some aren't,
// try to find one that is unassigned at least
for (ShardRouting replica : replicaShardRoutings) {
if (replica.unassigned()) {
shardRouting = replica;
break;
}
}
}
}
}
if (shardRouting == null) {
listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request));
return;
}
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
request.includeYesDecisions(), shardAllocator);
listener.onResponse(new ClusterAllocationExplainResponse(cae));
}
}

View File

@ -21,6 +21,9 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -572,4 +575,19 @@ public interface ClusterAdminClient extends ElasticsearchClient {
* Simulates an ingest pipeline
*/
SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source);
/**
* Explain the allocation of a shard
*/
void allocationExplain(ClusterAllocationExplainRequest request, ActionListener<ClusterAllocationExplainResponse> listener);
/**
* Explain the allocation of a shard
*/
ActionFuture<ClusterAllocationExplainResponse> allocationExplain(ClusterAllocationExplainRequest request);
/**
* Explain the allocation of a shard
*/
ClusterAllocationExplainRequestBuilder prepareAllocationExplain();
}

View File

@ -25,6 +25,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
@ -1245,6 +1249,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
public SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source) {
return new SimulatePipelineRequestBuilder(this, SimulatePipelineAction.INSTANCE, source);
}
@Override
public void allocationExplain(ClusterAllocationExplainRequest request, ActionListener<ClusterAllocationExplainResponse> listener) {
execute(ClusterAllocationExplainAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<ClusterAllocationExplainResponse> allocationExplain(ClusterAllocationExplainRequest request) {
return execute(ClusterAllocationExplainAction.INSTANCE, request);
}
@Override
public ClusterAllocationExplainRequestBuilder prepareAllocationExplain() {
return new ClusterAllocationExplainRequestBuilder(this, ClusterAllocationExplainAction.INSTANCE);
}
}
static class IndicesAdmin implements IndicesAdminClient {

View File

@ -139,7 +139,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}
UnassignedInfo(StreamInput in) throws IOException {
public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -100,6 +101,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
this.threshold = threshold;
}
@Override
public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.weighShard(shard);
}
@Override
public boolean allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
@ -298,6 +305,29 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return balanceByWeights();
}
public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) {
final NodeSorter sorter = newNodeSorter();
final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights;
buildWeightOrderedIndices(sorter);
Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length);
float currentNodeWeight = 0.0f;
for (int i = 0; i < modelNodes.length; i++) {
if (modelNodes[i].getNodeId().equals(shard.currentNodeId())) {
// If a node was found with the shard, use that weight instead of 0.0
currentNodeWeight = weights[i];
break;
}
}
for (int i = 0; i < modelNodes.length; i++) {
final float delta = currentNodeWeight - weights[i];
nodes.put(modelNodes[i].getRoutingNode().node(), delta);
}
return nodes;
}
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the

View File

@ -19,8 +19,11 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import java.util.Map;
/**
* <p>
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
@ -40,4 +43,15 @@ public interface ShardsAllocator {
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocate(RoutingAllocation allocation);
/**
* Returns a map of node to a float "weight" of where the allocator would like to place the shard.
* Higher weights signify greater desire to place the shard on that node.
* Does not modify the allocation at all.
*
* @param allocation current node allocation
* @param shard shard to weigh
* @return map of nodes to float weights
*/
Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard);
}

View File

@ -110,7 +110,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
this::setForcedAwarenessAttributes);
}
private void setForcedAwarenessAttributes(Settings forceSettings) {
@ -150,7 +151,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled");
return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled");
}
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@ -158,7 +159,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute);
return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute);
}
// build attr_value -> nodes map
@ -180,7 +181,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (!node.nodeId().equals(nodeId)) {
// we work on different nodes, move counts around
shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), 0, -1);
shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute),
0, -1);
shardPerAttribute.addTo(node.node().attributes().get(awarenessAttribute), 1);
}
} else {
@ -215,8 +217,15 @@ public class AwarenessAllocationDecider extends AllocationDecider {
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return allocation.decision(Decision.NO, NAME,
"too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]",
awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute);
"there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " +
" and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " +
"number of shards per attribute [%d] plus leftover [%d]",
awarenessAttribute,
shardCount,
numberOfAttributes,
currentNodeCount,
requiredCountPerAttribute,
leftoverPerAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
@ -224,6 +233,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
return allocation.decision(Decision.YES, NAME, "node meets awareness requirements");
return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}
}

View File

@ -78,7 +78,8 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
} else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
throw new IllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
throw new IllegalArgumentException("Illegal value for " +
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
}
}
@ -90,10 +91,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
try {
type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings);
} catch (IllegalStateException e) {
logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'",
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), type.toString().toLowerCase(Locale.ROOT));
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
type.toString().toLowerCase(Locale.ROOT));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
}
@ -112,11 +116,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards");
return allocation.decision(Decision.NO, NAME,
"the cluster has unassigned primary shards and rebalance type is set to [%s]", type);
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards");
return allocation.decision(Decision.NO, NAME,
"the cluster has inactive primary shards and rebalance type is set to [%s]", type);
}
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
@ -124,15 +130,17 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if (allocation.routingNodes().hasUnassignedShards() ) {
return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards");
return allocation.decision(Decision.NO, NAME,
"the cluster has unassigned shards and rebalance type is set to [%s]", type);
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return allocation.decision(Decision.NO, NAME, "cluster has inactive shards");
return allocation.decision(Decision.NO, NAME,
"the cluster has inactive shards and rebalance type is set to [%s]", type);
}
}
// type == Type.ALWAYS
return allocation.decision(Decision.YES, NAME, "all shards are active");
return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type);
}
}

View File

@ -53,7 +53,8 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
super(settings);
this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
this::setClusterConcurrentRebalance);
}
private void setClusterConcurrentRebalance(int concurrentRebalance) {
@ -63,12 +64,16 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed");
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
return allocation.decision(Decision.NO, NAME, "too many concurrent rebalances [%d], limit: [%d]",
allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (relocatingShards >= clusterConcurrentRebalance) {
return allocation.decision(Decision.NO, NAME,
"too many shards are concurrently rebalancing [%d], limit: [%d]",
relocatingShards, clusterConcurrentRebalance);
}
return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
return allocation.decision(Decision.YES, NAME,
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
clusterConcurrentRebalance, relocatingShards);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@ -145,6 +146,11 @@ public abstract class Decision implements ToXContent {
public abstract String label();
/**
* Return the list of all decisions that make up this decision
*/
public abstract List<Decision> getDecisions();
/**
* Simple class representing a single decision
*/
@ -191,6 +197,11 @@ public abstract class Decision implements ToXContent {
return this.label;
}
@Override
public List<Decision> getDecisions() {
return Collections.singletonList(this);
}
/**
* Returns the explanation string, fully formatted. Only formats the string once
*/
@ -202,11 +213,35 @@ public abstract class Decision implements ToXContent {
}
@Override
public String toString() {
if (explanation == null) {
return type + "()";
public boolean equals(Object object) {
if (this == object) {
return true;
}
return type + "(" + getExplanation() + ")";
if (object == null || getClass() != object.getClass()) {
return false;
}
Decision.Single s = (Decision.Single) object;
return this.type == s.type &&
this.label.equals(s.label) &&
this.getExplanation().equals(s.getExplanation());
}
@Override
public int hashCode() {
int result = this.type.hashCode();
result = 31 * result + this.label.hashCode();
result = 31 * result + this.getExplanation().hashCode();
return result;
}
@Override
public String toString() {
if (explanationString != null || explanation != null) {
return type + "(" + getExplanation() + ")";
}
return type + "()";
}
@Override
@ -258,6 +293,31 @@ public abstract class Decision implements ToXContent {
return null;
}
@Override
public List<Decision> getDecisions() {
return Collections.unmodifiableList(this.decisions);
}
@Override
public boolean equals(final Object object) {
if (this == object) {
return true;
}
if (object == null || getClass() != object.getClass()) {
return false;
}
final Decision.Multi m = (Decision.Multi) object;
return this.decisions.equals(m.decisions);
}
@Override
public int hashCode() {
return 31 * decisions.hashCode();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -164,7 +164,8 @@ public class DiskThresholdDecider extends AllocationDecider {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
} else {
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
"in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
nodeHasPassedWatermark.add(node);
@ -183,7 +184,8 @@ public class DiskThresholdDecider extends AllocationDecider {
explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node);
} else {
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute",
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
"in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
}
@ -238,13 +240,15 @@ public class DiskThresholdDecider extends AllocationDecider {
private void setLowWatermark(String lowWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
private void setHighWatermark(String highWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
// For Testing
@ -299,7 +303,8 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) {
public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo,
boolean subtractShardsMovingAway, String dataPath) {
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
@ -353,7 +358,8 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME,
"the node is above the low watermark and has less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
} else if (freeBytes > freeBytesThresholdHigh.bytes()) {
// Allow the shard to be allocated because it is primary that
@ -363,7 +369,8 @@ public class DiskThresholdDecider extends AllocationDecider {
"but allowing allocation because primary has never been allocated",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
return allocation.decision(Decision.YES, NAME,
"the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@ -372,7 +379,9 @@ public class DiskThresholdDecider extends AllocationDecider {
"preventing allocation even though primary has never been allocated",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME,
"the node is above the high watermark even though this shard has never been allocated " +
"and has less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
}
@ -386,7 +395,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
return allocation.decision(Decision.NO, NAME,
"the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeDiskPercentage);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
@ -397,7 +407,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
return allocation.decision(Decision.YES, NAME,
"the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@ -407,7 +418,9 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(freeDiskThresholdHigh, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
return allocation.decision(Decision.NO, NAME,
"the node is above the high watermark even though this shard has never been allocated " +
"and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdHigh, freeDiskPercentage);
}
}
@ -417,19 +430,29 @@ public class DiskThresholdDecider extends AllocationDecider {
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
logger.warn("after allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
logger.warn("after allocating, node [{}] would have less than the required " +
"{} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME,
"after allocating the shard to this node, it would be above the high watermark " +
"and have less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
logger.warn("after allocating, node [{}] would have more than the allowed {} free disk threshold ({} free), preventing allocation",
logger.warn("after allocating, node [{}] would have more than the allowed " +
"{} free disk threshold ({} free), preventing allocation",
node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%"));
return allocation.decision(Decision.NO, NAME, "after allocation more than allowed [%s%%] used disk on node, free: [%s%%]",
return allocation.decision(Decision.NO, NAME,
"after allocating the shard to this node, it would be above the high watermark " +
"and have more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeSpaceAfterShard);
}
return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
return allocation.decision(Decision.YES, NAME,
"enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]",
new ByteSizeValue(freeBytes),
new ByteSizeValue(shardSize),
new ByteSizeValue(freeBytesAfterShard));
}
@Override
@ -453,14 +476,17 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes);
}
if (dataPath == null || usage.getPath().equals(dataPath) == false) {
return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk");
return allocation.decision(Decision.YES, NAME,
"this shard is not allocated on the most utilized disk and can remain");
}
if (freeBytes < freeBytesThresholdHigh.bytes()) {
if (logger.isDebugEnabled()) {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
return allocation.decision(Decision.NO, NAME,
"after allocating this shard this node would be above the high watermark " +
"and there would be less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
@ -468,11 +494,14 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]",
return allocation.decision(Decision.NO, NAME,
"after allocating this shard this node would be above the high watermark " +
"and there would be less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdHigh, freeDiskPercentage);
}
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
return allocation.decision(Decision.YES, NAME,
"there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
}
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
@ -543,7 +572,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return RatioValue.parseRatioValue(watermark).getAsPercent();
} catch (ElasticsearchParseException ex) {
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
// cases separately
return 100.0;
}
}
@ -556,7 +586,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
} catch (ElasticsearchParseException ex) {
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
// cases separately
return ByteSizeValue.parseBytesSizeValue("0b", settingName);
}
}
@ -583,7 +614,7 @@ public class DiskThresholdDecider extends AllocationDecider {
private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled");
}
// Allow allocation regardless if only a single data node is available
@ -591,7 +622,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("only a single data node is present, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "only a single data node is present");
return allocation.decision(Decision.YES, NAME, "there is only a single data node present");
}
// Fail open there is no info available
@ -600,7 +631,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("cluster info unavailable for disk threshold decider, allowing allocation.");
}
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable");
}
// Fail open if there are no disk usages available
@ -608,7 +639,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
return allocation.decision(Decision.YES, NAME, "disk usages are unavailable");
}
return null;
}

View File

@ -32,8 +32,9 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
/**
* This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
* {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
* This allocation decider allows shard allocations / rebalancing via the cluster wide settings
* {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
* {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
* The per index settings overrides the cluster wide setting.
*
* <p>
@ -98,7 +99,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation");
}
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@ -133,7 +134,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored");
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation");
}
Settings indexSettings = allocation.routingNodes().metaData().getIndexSafe(shardRouting.index()).getSettings();
@ -167,7 +168,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Allocation values or rather their string representation to be used used with
* {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
* {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
* {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Allocation {
@ -193,7 +195,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Rebalance values or rather their string representation to be used used with
* {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
* {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} /
* {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Rebalance {

View File

@ -50,11 +50,14 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
* would disallow the allocation. Filters are applied in the following order:
* <ol>
* <li><tt>required</tt> - filters required allocations.
* If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate on the filtered node</li>
* If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate
* on the filtered node</li>
* <li><tt>include</tt> - filters "allowed" allocations.
* If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for the filtered node</li>
* If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for
* the filtered node</li>
* <li><tt>exclude</tt> - filters "prohibited" allocations.
* If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the filtered node</li>
* If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the
* filtered node</li>
* </ol>
*/
public class FilterAllocationDecider extends AllocationDecider {

View File

@ -52,7 +52,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
return isVersionCompatible(shardRouting.restoreSource(), node, allocation);
} else {
// fresh primary, we can allocate wherever
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
return allocation.decision(Decision.YES, NAME, "the primary shard is new and can be allocated anywhere");
}
} else {
// relocating primary, only migrate to newer host
@ -70,16 +70,17 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
}
}
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target,
RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]",
return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]",
target.node().version(), source.node().version());
} else {
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]",
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]",
target.node().version(), source.node().version());
}
}
@ -87,10 +88,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
if (target.node().version().onOrAfter(restoreSource.version())) {
/* we can allocate if we can restore from a snapshot that is older or on the same version */
return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]",
return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]",
target.node().version(), restoreSource.version());
} else {
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]",
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]",
target.node().version(), restoreSource.version());
}
}

View File

@ -41,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
if (!allocation.routingNodes().allReplicasActive(shardRouting)) {
return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster");
return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster");
}
return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster");
return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur");
}
}

View File

@ -45,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, NAME, "shard is primary");
return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated");
}
ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) {
return allocation.decision(Decision.NO, NAME, "primary shard is not yet active");
return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active");
}
return allocation.decision(Decision.YES, NAME, "primary is already active");
return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active");
}
}

View File

@ -61,7 +61,8 @@ public class SameShardAllocationDecider extends AllocationDecider {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting);
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId());
}
}
if (sameHost) {
@ -85,7 +86,7 @@ public class SameShardAllocationDecider extends AllocationDecider {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, NAME,
"shard cannot be allocated on same host [%s] it already exists on", node.nodeId());
"shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId());
}
}
}

View File

@ -93,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@ -110,14 +110,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
}
}
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME,
"too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
return allocation.decision(Decision.YES, NAME,
"the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@ -130,7 +132,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@ -149,14 +151,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
// Subtle difference between the `canAllocate` and `canRemain` is that
// this checks > while canAllocate checks >=
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME,
"too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
return allocation.decision(Decision.YES, NAME,
"the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@ -168,7 +172,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0",
return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [cluster: %d] <= 0",
clusterShardLimit);
}
@ -181,10 +185,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeShardCount++;
}
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node",
return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node",
clusterShardLimit);
}
}

View File

@ -54,7 +54,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
}
/**
* Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from given settings
* Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from
* given settings
*
* @param settings {@link org.elasticsearch.common.settings.Settings} to use
*/
@ -66,7 +67,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
public SnapshotInProgressAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
super(settings);
enableRelocation = CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, this::setEnableRelocation);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING,
this::setEnableRelocation);
}
private void setEnableRelocation(boolean enableRelocation) {
@ -104,14 +106,18 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null &&
shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
if (logger.isTraceEnabled()) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled");
return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled");
}
}

View File

@ -84,11 +84,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
this::setPrimariesInitialRecoveries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
this::setConcurrentIncomingRecoverries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
this::setConcurrentOutgoingRecoverries);
logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " +
"node_initial_primaries_recoveries [{}]",
concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
}
private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) {
@ -118,7 +123,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]",
return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
@ -137,13 +142,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]",
return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]",
return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries);
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
currentOutRecoveries,
concurrentOutgoingRecoveries,
currentInRecoveries,
concurrentIncomingRecoveries);
}
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.action.admin.cluster.allocation.RestClusterAllocationExplainAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
@ -171,6 +172,7 @@ public class NetworkModule extends AbstractModule {
RestNodesInfoAction.class,
RestNodesStatsAction.class,
RestNodesHotThreadsAction.class,
RestClusterAllocationExplainAction.class,
RestClusterStatsAction.class,
RestClusterStateAction.class,
RestClusterHealthAction.class,

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.allocation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import java.io.IOException;
/**
* Class handling cluster allocation explanation at the REST level
*/
public class RestClusterAllocationExplainAction extends BaseRestHandler {
@Inject
public RestClusterAllocationExplainAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/allocation/explain", this);
controller.registerHandler(RestRequest.Method.POST, "/_cluster/allocation/explain", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClusterAllocationExplainRequest req;
if (RestActions.hasBodyContent(request) == false) {
// Empty request signals "explain the first unassigned shard you find"
req = new ClusterAllocationExplainRequest();
} else {
BytesReference content = RestActions.getRestContent(request);
try (XContentParser parser = XContentFactory.xContent(content).createParser(content)) {
req = ClusterAllocationExplainRequest.parse(parser);
} catch (IOException e) {
logger.debug("failed to parse allocation explain request", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e)));
return;
}
}
try {
req.includeYesDecisions(request.paramAsBoolean("include_yes_decisions", false));
client.admin().cluster().allocationExplain(req, new RestBuilderListener<ClusterAllocationExplainResponse>(channel) {
@Override
public RestResponse buildResponse(ClusterAllocationExplainResponse response, XContentBuilder builder) throws Exception {
response.getExplanation().toXContent(builder, ToXContent.EMPTY_PARAMS);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
} catch (Exception e) {
logger.error("failed to explain allocation", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e)));
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.test.ESSingleNodeTestCase;
/**
* Tests for the cluster allocation explanation
*/
public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase {
public void testShardExplain() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings("index.number_of_shards", 1, "index.number_of_replicas", 1).get();
client().admin().cluster().health(Requests.clusterHealthRequest("test").waitForYellowStatus()).get();
ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain()
.setIndex("test").setShard(0).setPrimary(false).get();
ClusterAllocationExplanation cae = resp.getExplanation();
assertNotNull("should always have an explanation", cae);
assertEquals("test", cae.getShard().getIndexName());
assertEquals(0, cae.getShard().getId());
assertEquals(false, cae.isPrimary());
assertNull(cae.getAssignedNodeId());
assertNotNull(cae.getUnassignedInfo());
Decision d = cae.getNodeDecisions().values().iterator().next();
assertNotNull("should have a decision", d);
assertEquals(Decision.Type.NO, d.type());
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
assertTrue(d instanceof Decision.Multi);
Decision.Multi md = (Decision.Multi) d;
Decision ssd = md.getDecisions().get(0);
assertEquals(Decision.Type.NO, ssd.type());
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
Float weight = cae.getNodeWeights().values().iterator().next();
assertNotNull("should have a weight", weight);
resp = client().admin().cluster().prepareAllocationExplain().setIndex("test").setShard(0).setPrimary(true).get();
cae = resp.getExplanation();
assertNotNull("should always have an explanation", cae);
assertEquals("test", cae.getShard().getIndexName());
assertEquals(0, cae.getShard().getId());
assertEquals(true, cae.isPrimary());
assertNotNull("shard should have assigned node id", cae.getAssignedNodeId());
assertNull("assigned shard should not have unassigned info", cae.getUnassignedInfo());
d = cae.getNodeDecisions().values().iterator().next();
assertNotNull("should have a decision", d);
assertEquals(Decision.Type.NO, d.type());
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
assertTrue(d instanceof Decision.Multi);
md = (Decision.Multi) d;
ssd = md.getDecisions().get(0);
assertEquals(Decision.Type.NO, ssd.type());
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
weight = cae.getNodeWeights().values().iterator().next();
assertNotNull("should have a weight", weight);
resp = client().admin().cluster().prepareAllocationExplain().useAnyUnassignedShard().get();
cae = resp.getExplanation();
assertNotNull("should always have an explanation", cae);
assertEquals("test", cae.getShard().getIndexName());
assertEquals(0, cae.getShard().getId());
assertEquals(false, cae.isPrimary());
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Tests for the cluster allocation explanation
*/
public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testDecisionEquality() {
Decision.Multi d = new Decision.Multi();
Decision.Multi d2 = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
d2.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d2.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
d2.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
assertEquals(d, d2);
}
public void testExplanationSerialization() throws Exception {
ShardId shard = new ShardId("test", "uuid", 0);
Map<DiscoveryNode, Decision> nodeToDecisions = new HashMap<>();
Map<DiscoveryNode, Float> nodeToWeight = new HashMap<>();
for (int i = randomIntBetween(2, 5); i > 0; i--) {
DiscoveryNode dn = new DiscoveryNode("node-" + i, DummyTransportAddress.INSTANCE, Version.CURRENT);
Decision.Multi d = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
nodeToDecisions.put(dn, d);
nodeToWeight.put(dn, randomFloat());
}
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null,
nodeToDecisions, nodeToWeight);
BytesStreamOutput out = new BytesStreamOutput();
cae.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
ClusterAllocationExplanation cae2 = new ClusterAllocationExplanation(in);
assertEquals(shard, cae2.getShard());
assertTrue(cae2.isPrimary());
assertTrue(cae2.isAssigned());
assertEquals("assignedNode", cae2.getAssignedNodeId());
assertNull(cae2.getUnassignedInfo());
for (Map.Entry<DiscoveryNode, Decision> entry : cae2.getNodeDecisions().entrySet()) {
assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue());
}
assertEquals(nodeToWeight, cae2.getNodeWeights());
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.IndexTemplateFilter;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@ -39,6 +40,8 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import java.util.HashMap;
import java.util.Map;
public class ClusterModuleTests extends ModuleTestCase {
public static class FakeAllocationDecider extends AllocationDecider {
@ -52,6 +55,11 @@ public class ClusterModuleTests extends ModuleTestCase {
public boolean allocate(RoutingAllocation allocation) {
return false;
}
@Override
public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
return new HashMap<>();
}
}
static class FakeIndexTemplateFilter implements IndexTemplateFilter {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
@ -44,6 +45,8 @@ import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.hamcrest.Matchers;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -313,6 +316,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
return new HashMap<DiscoveryNode, Float>();
}
/*
* // this allocator tries to rebuild this scenario where a rebalance is
* // triggered solely by the primary overload on node [1] where a shard

View File

@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -794,7 +795,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
fail("should not have been able to reroute the shard");
} catch (IllegalArgumentException e) {
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
e.getMessage().contains("more than allowed [70.0%] used disk on node, free: [26.0%]"), equalTo(true));
e.getMessage(),
containsString("the node is above the low watermark and has more than allowed [70.0%] used disk, free: [26.0%]"));
}
}

View File

@ -46,3 +46,5 @@ include::cluster/nodes-stats.asciidoc[]
include::cluster/nodes-info.asciidoc[]
include::cluster/nodes-hot-threads.asciidoc[]
include::cluster/allocation-explain.asciidoc[]

View File

@ -0,0 +1,159 @@
[[cluster-allocation-explain]]
== Cluster Allocation Explain API
The cluster allocation explanation API is designed to assist in answering the
question "why is this shard unassigned?". To explain the allocation (on
unassigned state) of a shard, issue a request like:
[source,js]
--------------------------------------------------
$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain' -d'{
"index": "myindex",
"shard": 0,
"primary": false
}'
--------------------------------------------------
Specify the `index` and `shard` id of the shard you would like an explanation
for, as well as the `primary` flag to indicate whether to explain a primary or
replica shard.
The response looks like:
[source,js]
--------------------------------------------------
{
"shard" : {
"index" : "myindex",
"index_uuid" : "KnW0-zELRs6PK84l0r38ZA",
"id" : 0,
"primary" : false
},
"assigned" : false, <1>
"unassigned_info" : {
"reason" : "INDEX_CREATED", <2>
"at" : "2016-03-22T20:04:23.620Z"
},
"nodes" : { <3>
"V-Spi0AyRZ6ZvKbaI3691w" : {
"node_name" : "node1",
"node_attributes" : { <4>
"bar" : "baz"
},
"final_decision" : "NO", <5>
"weight" : 0.06666675, <6>
"decisions" : [ { <7>
"decider" : "filter",
"decision" : "NO",
"explanation" : "node does not match index include filters [foo:\"bar\"]"
} ]
},
"Qc6VL8c5RWaw1qXZ0Rg57g" : {
"node_name" : "node2",
"node_attributes" : {
"bar" : "baz",
"foo" : "bar"
},
"final_decision" : "NO",
"weight" : -1.3833332,
"decisions" : [ {
"decider" : "same_shard",
"decision" : "NO",
"explanation" : "the shard cannot be allocated on the same node id [Qc6VL8c5RWaw1qXZ0Rg57g] on which it already exists"
} ]
},
"PzdyMZGXQdGhqTJHF_hGgA" : {
"node_name" : "node3",
"node_attributes" : { },
"final_decision" : "NO",
"weight" : 2.3166666,
"decisions" : [ {
"decider" : "filter",
"decision" : "NO",
"explanation" : "node does not match index include filters [foo:\"bar\"]"
} ]
}
}
}
--------------------------------------------------
<1> Whether the shard is assigned or unassigned
<2> Reason for the shard originally becoming unassigned
<3> List of node decisions about the shard
<4> User-added attributes the node has
<5> Final decision for whether the shard is allowed to be allocated to this node
<6> Weight for how much the allocator would like to allocate the shard to this node
<7> List of decisions factoring into final decision
For a shard that is already assigned, the output looks similar to:
[source,js]
--------------------------------------------------
{
"shard" : {
"index" : "only-foo",
"index_uuid" : "KnW0-zELRs6PK84l0r38ZA",
"id" : 0,
"primary" : true
},
"assigned" : true,
"assigned_node_id" : "Qc6VL8c5RWaw1qXZ0Rg57g", <1>
"nodes" : {
"V-Spi0AyRZ6ZvKbaI3691w" : {
"node_name" : "Susan Storm",
"node_attributes" : {
"bar" : "baz"
},
"final_decision" : "NO",
"weight" : 1.4499999,
"decisions" : [ {
"decider" : "filter",
"decision" : "NO",
"explanation" : "node does not match index include filters [foo:\"bar\"]"
} ]
},
"Qc6VL8c5RWaw1qXZ0Rg57g" : {
"node_name" : "Slipstream",
"node_attributes" : {
"bar" : "baz",
"foo" : "bar"
},
"final_decision" : "CURRENTLY_ASSIGNED", <2>
"weight" : 0.0,
"decisions" : [ {
"decider" : "same_shard",
"decision" : "NO",
"explanation" : "the shard cannot be allocated on the same node id [Qc6VL8c5RWaw1qXZ0Rg57g] on which it already exists"
} ]
},
"PzdyMZGXQdGhqTJHF_hGgA" : {
"node_name" : "The Symbiote",
"node_attributes" : { },
"final_decision" : "NO",
"weight" : 3.6999998,
"decisions" : [ {
"decider" : "filter",
"decision" : "NO",
"explanation" : "node does not match index include filters [foo:\"bar\"]"
} ]
}
}
}
--------------------------------------------------
<1> Node the shard is currently assigned to
<2> The decision is "CURRENTLY_ASSIGNED" because the shard is currently assigned to this node
You can also have Elasticsearch explain the allocation of the first unassigned
shard it finds by sending an empty body, such as:
[source,js]
--------------------------------------------------
$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain'
--------------------------------------------------
And if you would like to include all decisions that were factored into the final
decision, the `include_yes_decisions` parameter will return all decisions:
[source,js]
--------------------------------------------------
$ curl -XGET 'http://localhost:9200/_cluster/allocation/explain?include_yes_decisions=true'
--------------------------------------------------

View File

@ -0,0 +1,20 @@
{
"cluster.allocation_explain": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-allocation-explain.html",
"methods": ["GET", "POST"],
"url": {
"path": "/_cluster/allocation/explain",
"paths": ["/_cluster/allocation/explain"],
"parts": {},
"params": {
"include_yes_decisions": {
"type": "boolean",
"description": "Return 'YES' decisions in explanation (default: false)"
}
}
},
"body": {
"description": "The index, shard, and primary flag to explain. Empty means 'explain the first unassigned shard'"
}
}
}

View File

@ -0,0 +1,76 @@
---
"cluster shard allocation explanation test":
- do:
# there aren't any unassigned shards to explain
catch: /unable to find any shards to explain/
cluster.allocation_explain: {}
- do:
indices.create:
index: test
- do:
cluster.health:
wait_for_status: yellow
- do:
cluster.state:
metric: [ master_node ]
- set: {master_node: node_id}
# This relies on there only being a single node in the test cluster, which
# is currently true, but if this changes in the future this test will need
# to be changed
- do:
cluster.allocation_explain:
body: { "index": "test", "shard": 0, "primary": true }
- match: { assigned: true }
# - match: { assigned_node_id: $node_id }
- is_true: assigned_node_id
- match: { shard.index: "test" }
- match: { shard.id: 0 }
- match: { shard.primary: true }
# unfortunately can't test these because they break with multi-node backwords
# compat REST tests
# - is_true: nodes.$node_id.node_name
# - match: { nodes.$node_id.node_attributes.testattr: "test" }
# - match: { nodes.$node_id.node_attributes.portsfile: "true" }
# - match: { nodes.$node_id.final_decision: "CURRENTLY_ASSIGNED" }
# - match: { nodes.$node_id.weight: 0.0 }
# - match: { nodes.$node_id.decisions.0.decider: "same_shard" }
# - match: { nodes.$node_id.decisions.0.decision: "NO" }
---
"cluster shard allocation explanation test with empty request":
- do:
indices.create:
index: test
body: { "index.number_of_shards": 1, "index.number_of_replicas": 9 }
- do:
cluster.health:
wait_for_status: yellow
- do:
cluster.state:
metric: [ master_node ]
- set: {master_node: node_id}
- do:
cluster.allocation_explain: {}
- match: { assigned: false }
- match: { unassigned_info.reason: "INDEX_CREATED" }
- is_true: unassigned_info.at
- match: { shard.index: "test" }
- match: { shard.id: 0 }
- match: { shard.primary: false }
# - is_true: nodes.$node_id.node_name
# - match: { nodes.$node_id.node_attributes.testattr: "test" }
# - match: { nodes.$node_id.node_attributes.portsfile: "true" }
# - match: { nodes.$node_id.final_decision: "NO" }
# - match: { nodes.$node_id.decisions.0.decider: "same_shard" }
# - match: { nodes.$node_id.decisions.0.decision: "NO" }

View File

@ -73,7 +73,7 @@ public class RestTestSuiteParser implements RestTestFragmentParser<RestTestSuite
XContentParser parser = parseContext.parser();
parser.nextToken();
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
assert parser.currentToken() == XContentParser.Token.START_OBJECT : "expected token to be START_OBJECT but was " + parser.currentToken();
RestTestSuite restTestSuite = new RestTestSuite(parseContext.getApi(), parseContext.getSuiteName());