Merge remote-tracking branch 'dakrone/add-store-status-to-explain'

This commit is contained in:
Lee Hinman 2016-04-28 13:47:55 -06:00
commit a0ba3c9a02
7 changed files with 734 additions and 122 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,7 +31,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -45,21 +43,18 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
private final ShardId shard; private final ShardId shard;
private final boolean primary; private final boolean primary;
private final String assignedNodeId; private final String assignedNodeId;
private final Map<DiscoveryNode, Decision> nodeToDecision;
private final Map<DiscoveryNode, Float> nodeWeights;
private final UnassignedInfo unassignedInfo; private final UnassignedInfo unassignedInfo;
private final long remainingDelayNanos; private final long remainingDelayMillis;
private final Map<DiscoveryNode, NodeExplanation> nodeExplanations;
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long remainingDelayMillis,
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision, @Nullable UnassignedInfo unassignedInfo, Map<DiscoveryNode, NodeExplanation> nodeExplanations) {
Map<DiscoveryNode, Float> nodeWeights, long remainingDelayNanos) {
this.shard = shard; this.shard = shard;
this.primary = primary; this.primary = primary;
this.assignedNodeId = assignedNodeId; this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo; this.unassignedInfo = unassignedInfo;
this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision; this.remainingDelayMillis = remainingDelayMillis;
this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights; this.nodeExplanations = nodeExplanations;
this.remainingDelayNanos = remainingDelayNanos;
} }
public ClusterAllocationExplanation(StreamInput in) throws IOException { public ClusterAllocationExplanation(StreamInput in) throws IOException {
@ -67,27 +62,15 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
this.primary = in.readBoolean(); this.primary = in.readBoolean();
this.assignedNodeId = in.readOptionalString(); this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new); this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
this.remainingDelayMillis = in.readVLong();
Map<DiscoveryNode, Decision> ntd = null; int mapSize = in.readVInt();
int size = in.readVInt(); Map<DiscoveryNode, NodeExplanation> nodeToExplanation = new HashMap<>(mapSize);
ntd = new HashMap<>(size); for (int i = 0; i < mapSize; i++) {
for (int i = 0; i < size; i++) { NodeExplanation nodeExplanation = new NodeExplanation(in);
DiscoveryNode dn = new DiscoveryNode(in); nodeToExplanation.put(nodeExplanation.getNode(), nodeExplanation);
Decision decision = Decision.readFrom(in);
ntd.put(dn, decision);
} }
this.nodeToDecision = ntd; this.nodeExplanations = nodeToExplanation;
Map<DiscoveryNode, Float> ntw = null;
size = in.readVInt();
ntw = new HashMap<>(size);
for (int i = 0; i < size; i++) {
DiscoveryNode dn = new DiscoveryNode(in);
float weight = in.readFloat();
ntw.put(dn, weight);
}
this.nodeWeights = ntw;
remainingDelayNanos = in.readVLong();
} }
@Override @Override
@ -96,27 +79,20 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
out.writeBoolean(this.isPrimary()); out.writeBoolean(this.isPrimary());
out.writeOptionalString(this.getAssignedNodeId()); out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo()); out.writeOptionalWriteable(this.getUnassignedInfo());
out.writeVLong(remainingDelayMillis);
Map<DiscoveryNode, Decision> ntd = this.getNodeDecisions(); out.writeVInt(this.nodeExplanations.size());
out.writeVInt(ntd.size()); for (NodeExplanation explanation : this.nodeExplanations.values()) {
for (Map.Entry<DiscoveryNode, Decision> entry : ntd.entrySet()) { explanation.writeTo(out);
entry.getKey().writeTo(out);
Decision.writeTo(entry.getValue(), out);
} }
Map<DiscoveryNode, Float> ntw = this.getNodeWeights();
out.writeVInt(ntw.size());
for (Map.Entry<DiscoveryNode, Float> entry : ntw.entrySet()) {
entry.getKey().writeTo(out);
out.writeFloat(entry.getValue());
}
out.writeVLong(remainingDelayNanos);
} }
/** Return the shard that the explanation is about */
public ShardId getShard() { public ShardId getShard() {
return this.shard; return this.shard;
} }
/** Return true if the explained shard is primary, false otherwise */
public boolean isPrimary() { public boolean isPrimary() {
return this.primary; return this.primary;
} }
@ -138,22 +114,14 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
return this.unassignedInfo; return this.unassignedInfo;
} }
/** Return a map of node to decision for shard allocation */ /** Return the remaining allocation delay for this shard in millisocends */
public Map<DiscoveryNode, Decision> getNodeDecisions() { public long getRemainingDelayMillis() {
return this.nodeToDecision; return this.remainingDelayMillis;
} }
/** /** Return a map of node to the explanation for that node */
* Return a map of node to balancer "weight" for allocation. Higher weights mean the balancer wants to allocated the shard to that node public Map<DiscoveryNode, NodeExplanation> getNodeExplanations() {
* more return this.nodeExplanations;
*/
public Map<DiscoveryNode, Float> getNodeWeights() {
return this.nodeWeights;
}
/** Return the remaining allocation delay for this shard in nanoseconds */
public long getRemainingDelayNanos() {
return this.remainingDelayNanos;
} }
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@ -174,36 +142,118 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
if (unassignedInfo != null) { if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params); unassignedInfo.toXContent(builder, params);
long delay = unassignedInfo.getLastComputedLeftDelayNanos(); long delay = unassignedInfo.getLastComputedLeftDelayNanos();
builder.field("allocation_delay", TimeValue.timeValueNanos(delay)); builder.timeValueField("allocation_delay_in_millis", "allocation_delay", TimeValue.timeValueNanos(delay));
builder.field("allocation_delay_ms", TimeValue.timeValueNanos(delay).millis()); builder.timeValueField("remaining_delay_in_millis", "remaining_delay", TimeValue.timeValueMillis(remainingDelayMillis));
builder.field("remaining_delay", TimeValue.timeValueNanos(remainingDelayNanos));
builder.field("remaining_delay_ms", TimeValue.timeValueNanos(remainingDelayNanos).millis());
} }
builder.startObject("nodes"); builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, Float> entry : nodeWeights.entrySet()) { for (NodeExplanation explanation : nodeExplanations.values()) {
DiscoveryNode node = entry.getKey(); explanation.toXContent(builder, params);
builder.startObject(node.getId()); {
builder.field("node_name", node.getName());
builder.startObject("node_attributes"); {
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
}
builder.endObject(); // end attributes
Decision d = nodeToDecision.get(node);
if (node.getId().equals(assignedNodeId)) {
builder.field("final_decision", "CURRENTLY_ASSIGNED");
} else {
builder.field("final_decision", d.type().toString());
}
builder.field("weight", entry.getValue());
d.toXContent(builder, params);
}
builder.endObject(); // end node <uuid>
} }
builder.endObject(); // end nodes builder.endObject(); // end nodes
} }
builder.endObject(); // end wrapping object builder.endObject(); // end wrapping object
return builder; return builder;
} }
/** An Enum representing the final decision for a shard allocation on a node */
public enum FinalDecision {
// Yes, the shard can be assigned
YES((byte) 0),
// No, the shard cannot be assigned
NO((byte) 1),
// The shard is already assigned to this node
ALREADY_ASSIGNED((byte) 2);
private final byte id;
FinalDecision (byte id) {
this.id = id;
}
private static FinalDecision fromId(byte id) {
switch (id) {
case 0: return YES;
case 1: return NO;
case 2: return ALREADY_ASSIGNED;
default:
throw new IllegalArgumentException("unknown id for final decision: [" + id + "]");
}
}
@Override
public String toString() {
switch (id) {
case 0: return "YES";
case 1: return "NO";
case 2: return "ALREADY_ASSIGNED";
default:
throw new IllegalArgumentException("unknown id for final decision: [" + id + "]");
}
}
static FinalDecision readFrom(StreamInput in) throws IOException {
return fromId(in.readByte());
}
void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
}
}
/** An Enum representing the state of the shard store's copy of the data on a node */
public enum StoreCopy {
// No data for this shard is on the node
NONE((byte) 0),
// A copy of the data is available on this node
AVAILABLE((byte) 1),
// The copy of the data on the node is corrupt
CORRUPT((byte) 2),
// There was an error reading this node's copy of the data
IO_ERROR((byte) 3),
// The copy of the data on the node is stale
STALE((byte) 4),
// It's unknown what the copy of the data is
UNKNOWN((byte) 5);
private final byte id;
StoreCopy (byte id) {
this.id = id;
}
private static StoreCopy fromId(byte id) {
switch (id) {
case 0: return NONE;
case 1: return AVAILABLE;
case 2: return CORRUPT;
case 3: return IO_ERROR;
case 4: return STALE;
case 5: return UNKNOWN;
default:
throw new IllegalArgumentException("unknown id for store copy: [" + id + "]");
}
}
@Override
public String toString() {
switch (id) {
case 0: return "NONE";
case 1: return "AVAILABLE";
case 2: return "CORRUPT";
case 3: return "IO_ERROR";
case 4: return "STALE";
case 5: return "UNKNOWN";
default:
throw new IllegalArgumentException("unknown id for store copy: [" + id + "]");
}
}
static StoreCopy readFrom(StreamInput in) throws IOException {
return fromId(in.readByte());
}
void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
}
}
} }

View File

@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
/** The cluster allocation explanation for a single node */
public class NodeExplanation implements Writeable, ToXContent {
private final DiscoveryNode node;
private final Decision nodeDecision;
private final Float nodeWeight;
private final IndicesShardStoresResponse.StoreStatus storeStatus;
private final ClusterAllocationExplanation.FinalDecision finalDecision;
private final ClusterAllocationExplanation.StoreCopy storeCopy;
private final String finalExplanation;
public NodeExplanation(final DiscoveryNode node, final Decision nodeDecision, final Float nodeWeight,
final @Nullable IndicesShardStoresResponse.StoreStatus storeStatus,
final ClusterAllocationExplanation.FinalDecision finalDecision,
final String finalExplanation,
final ClusterAllocationExplanation.StoreCopy storeCopy) {
this.node = node;
this.nodeDecision = nodeDecision;
this.nodeWeight = nodeWeight;
this.storeStatus = storeStatus;
this.finalDecision = finalDecision;
this.finalExplanation = finalExplanation;
this.storeCopy = storeCopy;
}
public NodeExplanation(StreamInput in) throws IOException {
this.node = new DiscoveryNode(in);
this.nodeDecision = Decision.readFrom(in);
this.nodeWeight = in.readFloat();
if (in.readBoolean()) {
this.storeStatus = IndicesShardStoresResponse.StoreStatus.readStoreStatus(in);
} else {
this.storeStatus = null;
}
this.finalDecision = ClusterAllocationExplanation.FinalDecision.readFrom(in);
this.finalExplanation = in.readString();
this.storeCopy = ClusterAllocationExplanation.StoreCopy.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
Decision.writeTo(nodeDecision, out);
out.writeFloat(nodeWeight);
if (storeStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
storeStatus.writeTo(out);
}
finalDecision.writeTo(out);
out.writeString(finalExplanation);
storeCopy.writeTo(out);
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(node.getId()); {
builder.field("node_name", node.getName());
builder.startObject("node_attributes"); {
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
}
builder.endObject(); // end attributes
builder.startObject("store"); {
builder.field("shard_copy", storeCopy.toString());
if (storeStatus != null) {
final Throwable storeErr = storeStatus.getStoreException();
if (storeErr != null) {
builder.field("store_exception", ExceptionsHelper.detailedMessage(storeErr));
}
}
}
builder.endObject(); // end store
builder.field("final_decision", finalDecision.toString());
builder.field("final_explanation", finalExplanation.toString());
builder.field("weight", nodeWeight);
nodeDecision.toXContent(builder, params);
}
builder.endObject(); // end node <uuid>
return builder;
}
public DiscoveryNode getNode() {
return this.node;
}
public Decision getDecision() {
return this.nodeDecision;
}
public Float getWeight() {
return this.nodeWeight;
}
@Nullable
public IndicesShardStoresResponse.StoreStatus getStoreStatus() {
return this.storeStatus;
}
public ClusterAllocationExplanation.FinalDecision getFinalDecision() {
return this.finalDecision;
}
public String getFinalExplanation() {
return this.finalExplanation;
}
public ClusterAllocationExplanation.StoreCopy getStoreCopy() {
return this.storeCopy;
}
}

View File

@ -20,8 +20,13 @@
package org.elasticsearch.action.admin.cluster.allocation; package org.elasticsearch.action.admin.cluster.allocation;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
@ -47,8 +52,10 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -56,6 +63,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the * The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
@ -68,19 +76,22 @@ public class TransportClusterAllocationExplainAction
private final ClusterInfoService clusterInfoService; private final ClusterInfoService clusterInfoService;
private final AllocationDeciders allocationDeciders; private final AllocationDeciders allocationDeciders;
private final ShardsAllocator shardAllocator; private final ShardsAllocator shardAllocator;
private final TransportIndicesShardStoresAction shardStoresAction;
@Inject @Inject
public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService, ClusterInfoService clusterInfoService, AllocationService allocationService, ClusterInfoService clusterInfoService,
AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator) { AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator,
TransportIndicesShardStoresAction shardStoresAction) {
super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters, super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterAllocationExplainRequest::new); indexNameExpressionResolver, ClusterAllocationExplainRequest::new);
this.allocationService = allocationService; this.allocationService = allocationService;
this.clusterInfoService = clusterInfoService; this.clusterInfoService = clusterInfoService;
this.allocationDeciders = allocationDeciders; this.allocationDeciders = allocationDeciders;
this.shardAllocator = shardAllocator; this.shardAllocator = shardAllocator;
this.shardStoresAction = shardStoresAction;
} }
@Override @Override
@ -118,12 +129,86 @@ public class TransportClusterAllocationExplainAction
} }
} }
/**
* Construct a {@code NodeExplanation} object for the given shard given all the metadata. This also attempts to construct the human
* readable FinalDecision and final explanation as part of the explanation.
*/
public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
IndexMetaData indexMetaData,
DiscoveryNode node,
Decision nodeDecision,
Float nodeWeight,
IndicesShardStoresResponse.StoreStatus storeStatus,
String assignedNodeId,
Set<String> activeAllocationIds) {
final ClusterAllocationExplanation.FinalDecision finalDecision;
final ClusterAllocationExplanation.StoreCopy storeCopy;
final String finalExplanation;
if (storeStatus == null) {
// No copies of the data
storeCopy = ClusterAllocationExplanation.StoreCopy.NONE;
} else {
final Throwable storeErr = storeStatus.getStoreException();
if (storeErr != null) {
if (ExceptionsHelper.unwrapCause(storeErr) instanceof CorruptIndexException) {
storeCopy = ClusterAllocationExplanation.StoreCopy.CORRUPT;
} else {
storeCopy = ClusterAllocationExplanation.StoreCopy.IO_ERROR;
}
} else if (activeAllocationIds.isEmpty()) {
// The ids are only empty if dealing with a legacy index
// TODO: fetch the shard state versions and display here?
storeCopy = ClusterAllocationExplanation.StoreCopy.UNKNOWN;
} else if (activeAllocationIds.contains(storeStatus.getAllocationId())) {
storeCopy = ClusterAllocationExplanation.StoreCopy.AVAILABLE;
} else {
// Otherwise, this is a stale copy of the data (allocation ids don't match)
storeCopy = ClusterAllocationExplanation.StoreCopy.STALE;
}
}
if (node.getId().equals(assignedNodeId)) {
finalDecision = ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED;
finalExplanation = "the shard is already assigned to this node";
} else if (shard.primary() && shard.unassigned() && shard.allocatedPostIndexCreate(indexMetaData) &&
storeCopy == ClusterAllocationExplanation.StoreCopy.STALE) {
finalExplanation = "the copy of the shard is stale, allocation ids do not match";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else if (shard.primary() && shard.unassigned() && shard.allocatedPostIndexCreate(indexMetaData) &&
storeCopy == ClusterAllocationExplanation.StoreCopy.NONE) {
finalExplanation = "there is no copy of the shard available";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else if (shard.primary() && shard.unassigned() && storeCopy == ClusterAllocationExplanation.StoreCopy.CORRUPT) {
finalExplanation = "the copy of the shard is corrupt";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else if (shard.primary() && shard.unassigned() && storeCopy == ClusterAllocationExplanation.StoreCopy.IO_ERROR) {
finalExplanation = "the copy of the shard cannot be read";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
} else {
if (nodeDecision.type() == Decision.Type.NO) {
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
finalExplanation = "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision";
} else {
finalDecision = ClusterAllocationExplanation.FinalDecision.YES;
if (storeCopy == ClusterAllocationExplanation.StoreCopy.AVAILABLE) {
finalExplanation = "the shard can be assigned and the node contains a valid copy of the shard data";
} else {
finalExplanation = "the shard can be assigned";
}
}
}
return new NodeExplanation(node, nodeDecision, nodeWeight, storeStatus, finalDecision, finalExplanation, storeCopy);
}
/** /**
* For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code * For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code
* includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions. * includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions.
*/ */
public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes, public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes,
boolean includeYesDecisions, ShardsAllocator shardAllocator) { boolean includeYesDecisions, ShardsAllocator shardAllocator,
List<IndicesShardStoresResponse.StoreStatus> shardStores) {
// don't short circuit deciders, we want a full explanation // don't short circuit deciders, we want a full explanation
allocation.debugDecision(true); allocation.debugDecision(true);
// get the existing unassigned info if available // get the existing unassigned info if available
@ -139,14 +224,35 @@ public class TransportClusterAllocationExplainAction
nodeToDecision.put(discoNode, d); nodeToDecision.put(discoNode, d);
} }
} }
long remainingDelayNanos = 0; long remainingDelayMillis = 0;
final MetaData metadata = allocation.metaData();
final IndexMetaData indexMetaData = metadata.index(shard.index());
if (ui != null) { if (ui != null) {
final MetaData metadata = allocation.metaData(); final Settings indexSettings = indexMetaData.getSettings();
final Settings indexSettings = metadata.index(shard.index()).getSettings(); long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings); remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
} }
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
shardAllocator.weighShard(allocation, shard), remainingDelayNanos); // Calculate weights for each of the nodes
Map<DiscoveryNode, Float> weights = shardAllocator.weighShard(allocation, shard);
Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeToStatus = new HashMap<>(shardStores.size());
for (IndicesShardStoresResponse.StoreStatus status : shardStores) {
nodeToStatus.put(status.getNode(), status);
}
Map<DiscoveryNode, NodeExplanation> explanations = new HashMap<>(shardStores.size());
for (Map.Entry<DiscoveryNode, Decision> entry : nodeToDecision.entrySet()) {
DiscoveryNode node = entry.getKey();
Decision decision = entry.getValue();
Float weight = weights.get(node);
IndicesShardStoresResponse.StoreStatus storeStatus = nodeToStatus.get(node);
NodeExplanation nodeExplanation = calculateNodeExplanation(shard, indexMetaData, node, decision, weight,
storeStatus, shard.currentNodeId(), indexMetaData.activeAllocationIds(shard.getId()));
explanations.put(node, nodeExplanation);
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(),
shard.currentNodeId(), remainingDelayMillis, ui, explanations);
} }
@Override @Override
@ -156,30 +262,30 @@ public class TransportClusterAllocationExplainAction
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(), final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(),
clusterInfoService.getClusterInfo(), System.nanoTime()); clusterInfoService.getClusterInfo(), System.nanoTime());
ShardRouting shardRouting = null; ShardRouting foundShard = null;
if (request.useAnyUnassignedShard()) { if (request.useAnyUnassignedShard()) {
// If we can use any shard, just pick the first unassigned one (if there are any) // If we can use any shard, just pick the first unassigned one (if there are any)
RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator(); RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator();
if (ui.hasNext()) { if (ui.hasNext()) {
shardRouting = ui.next(); foundShard = ui.next();
} }
} else { } else {
String index = request.getIndex(); String index = request.getIndex();
int shard = request.getShard(); int shard = request.getShard();
if (request.isPrimary()) { if (request.isPrimary()) {
// If we're looking for the primary shard, there's only one copy, so pick it directly // If we're looking for the primary shard, there's only one copy, so pick it directly
shardRouting = allocation.routingTable().shardRoutingTable(index, shard).primaryShard(); foundShard = allocation.routingTable().shardRoutingTable(index, shard).primaryShard();
} else { } else {
// If looking for a replica, go through all the replica shards // If looking for a replica, go through all the replica shards
List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards(); List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards();
if (replicaShardRoutings.size() > 0) { if (replicaShardRoutings.size() > 0) {
// Pick the first replica at the very least // Pick the first replica at the very least
shardRouting = replicaShardRoutings.get(0); foundShard = replicaShardRoutings.get(0);
// In case there are multiple replicas where some are assigned and some aren't, // In case there are multiple replicas where some are assigned and some aren't,
// try to find one that is unassigned at least // try to find one that is unassigned at least
for (ShardRouting replica : replicaShardRoutings) { for (ShardRouting replica : replicaShardRoutings) {
if (replica.unassigned()) { if (replica.unassigned()) {
shardRouting = replica; foundShard = replica;
break; break;
} }
} }
@ -187,14 +293,34 @@ public class TransportClusterAllocationExplainAction
} }
} }
if (shardRouting == null) { if (foundShard == null) {
listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request)); listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request));
return; return;
} }
final ShardRouting shardRouting = foundShard;
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting); logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes, getShardStores(shardRouting, new ActionListener<IndicesShardStoresResponse>() {
request.includeYesDecisions(), shardAllocator); @Override
listener.onResponse(new ClusterAllocationExplainResponse(cae)); public void onResponse(IndicesShardStoresResponse shardStoreResponse) {
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses =
shardStoreResponse.getStoreStatuses().get(shardRouting.getIndexName());
List<IndicesShardStoresResponse.StoreStatus> shardStoreStatus = shardStatuses.get(shardRouting.id());
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
request.includeYesDecisions(), shardAllocator, shardStoreStatus);
listener.onResponse(new ClusterAllocationExplainResponse(cae));
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
private void getShardStores(ShardRouting shard, final ActionListener<IndicesShardStoresResponse> listener) {
IndicesShardStoresRequest request = new IndicesShardStoresRequest(shard.getIndexName());
request.shardStatuses("all");
shardStoresAction.execute(request, listener);
} }
} }

View File

@ -164,7 +164,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
return allocationStatus; return allocationStatus;
} }
static StoreStatus readStoreStatus(StreamInput in) throws IOException { public static StoreStatus readStoreStatus(StreamInput in) throws IOException {
StoreStatus storeStatus = new StoreStatus(); StoreStatus storeStatus = new StoreStatus();
storeStatus.readFrom(in); storeStatus.readFrom(in);
return storeStatus; return storeStatus;

View File

@ -20,14 +20,21 @@
package org.elasticsearch.action.admin.cluster.allocation; package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -68,6 +75,101 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
assertThat(cae.getShard().getIndexName(), equalTo("test")); assertThat(cae.getShard().getIndexName(), equalTo("test"));
assertFalse(cae.isPrimary()); assertFalse(cae.isPrimary());
assertFalse(cae.isAssigned()); assertFalse(cae.isAssigned());
assertThat("expecting a remaining delay, got: " + cae.getRemainingDelayNanos(), cae.getRemainingDelayNanos(), greaterThan(0L)); assertThat("expecting a remaining delay, got: " + cae.getRemainingDelayMillis(), cae.getRemainingDelayMillis(), greaterThan(0L));
}
public void testUnassignedShards() throws Exception {
logger.info("--> starting 3 nodes");
String noAttrNode = internalCluster().startNode();
String barAttrNode = internalCluster().startNode(Settings.builder().put("node.attr.bar", "baz"));
String fooBarAttrNode = internalCluster().startNode(Settings.builder()
.put("node.attr.foo", "bar")
.put("node.attr.bar", "baz"));
// Wait for all 3 nodes to be up
logger.info("--> waiting for 3 nodes to be up");
client().admin().cluster().health(Requests.clusterHealthRequest().waitForNodes("3")).actionGet();
client().admin().indices().prepareCreate("anywhere")
.setSettings(Settings.builder()
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1))
.get();
client().admin().indices().prepareCreate("only-baz")
.setSettings(Settings.builder()
.put("index.routing.allocation.include.bar", "baz")
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1))
.get();
client().admin().indices().prepareCreate("only-foo")
.setSettings(Settings.builder()
.put("index.routing.allocation.include.foo", "bar")
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1))
.get();
ensureGreen("anywhere", "only-baz");
ensureYellow("only-foo");
ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain()
.setIndex("only-foo")
.setShard(0)
.setPrimary(false)
.get();
ClusterAllocationExplanation cae = resp.getExplanation();
assertThat(cae.getShard().getIndexName(), equalTo("only-foo"));
assertFalse(cae.isPrimary());
assertFalse(cae.isAssigned());
assertThat(UnassignedInfo.Reason.INDEX_CREATED, equalTo(cae.getUnassignedInfo().getReason()));
assertThat("expecting no remaining delay: " + cae.getRemainingDelayMillis(), cae.getRemainingDelayMillis(), equalTo(0L));
Map<DiscoveryNode, NodeExplanation> explanations = cae.getNodeExplanations();
Float noAttrWeight = -1f;
Float barAttrWeight = -1f;
Float fooBarAttrWeight = -1f;
for (Map.Entry<DiscoveryNode, NodeExplanation> entry : explanations.entrySet()) {
DiscoveryNode node = entry.getKey();
String nodeName = node.getName();
NodeExplanation explanation = entry.getValue();
ClusterAllocationExplanation.FinalDecision finalDecision = explanation.getFinalDecision();
String finalExplanation = explanation.getFinalExplanation();
ClusterAllocationExplanation.StoreCopy storeCopy = explanation.getStoreCopy();
Decision d = explanation.getDecision();
float weight = explanation.getWeight();
IndicesShardStoresResponse.StoreStatus storeStatus = explanation.getStoreStatus();
assertEquals(d.type(), Decision.Type.NO);
if (noAttrNode.equals(nodeName)) {
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
noAttrWeight = weight;
assertNull(storeStatus);
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
explanation.getFinalExplanation());
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
} else if (barAttrNode.equals(nodeName)) {
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
barAttrWeight = weight;
assertNull(storeStatus);
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
explanation.getFinalExplanation());
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
} else if (fooBarAttrNode.equals(nodeName)) {
assertThat(d.toString(), containsString("the shard cannot be allocated on the same node id"));
fooBarAttrWeight = weight;
assertEquals(storeStatus.getAllocationStatus(),
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY);
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, finalDecision);
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
assertEquals("the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
explanation.getFinalExplanation());
} else {
fail("unexpected node with name: " + nodeName +
", I have: " + noAttrNode + ", " + barAttrNode + ", " + fooBarAttrNode);
}
}
assertFalse(barAttrWeight == fooBarAttrWeight);
} }
} }

View File

@ -43,16 +43,22 @@ public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase {
assertEquals(false, cae.isPrimary()); assertEquals(false, cae.isPrimary());
assertNull(cae.getAssignedNodeId()); assertNull(cae.getAssignedNodeId());
assertNotNull(cae.getUnassignedInfo()); assertNotNull(cae.getUnassignedInfo());
Decision d = cae.getNodeDecisions().values().iterator().next(); NodeExplanation explanation = cae.getNodeExplanations().values().iterator().next();
ClusterAllocationExplanation.FinalDecision fd = explanation.getFinalDecision();
ClusterAllocationExplanation.StoreCopy storeCopy = explanation.getStoreCopy();
String finalExplanation = explanation.getFinalExplanation();
Decision d = explanation.getDecision();
assertNotNull("should have a decision", d); assertNotNull("should have a decision", d);
assertEquals(Decision.Type.NO, d.type()); assertEquals(Decision.Type.NO, d.type());
assertEquals(ClusterAllocationExplanation.FinalDecision.NO, fd);
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id")); assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
assertTrue(d instanceof Decision.Multi); assertTrue(d instanceof Decision.Multi);
Decision.Multi md = (Decision.Multi) d; Decision.Multi md = (Decision.Multi) d;
Decision ssd = md.getDecisions().get(0); Decision ssd = md.getDecisions().get(0);
assertEquals(Decision.Type.NO, ssd.type()); assertEquals(Decision.Type.NO, ssd.type());
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id")); assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
Float weight = cae.getNodeWeights().values().iterator().next(); Float weight = explanation.getWeight();
assertNotNull("should have a weight", weight); assertNotNull("should have a weight", weight);
resp = client().admin().cluster().prepareAllocationExplain().setIndex("test").setShard(0).setPrimary(true).get(); resp = client().admin().cluster().prepareAllocationExplain().setIndex("test").setShard(0).setPrimary(true).get();
@ -64,16 +70,22 @@ public final class ClusterAllocationExplainTests extends ESSingleNodeTestCase {
assertEquals(true, cae.isPrimary()); assertEquals(true, cae.isPrimary());
assertNotNull("shard should have assigned node id", cae.getAssignedNodeId()); assertNotNull("shard should have assigned node id", cae.getAssignedNodeId());
assertNull("assigned shard should not have unassigned info", cae.getUnassignedInfo()); assertNull("assigned shard should not have unassigned info", cae.getUnassignedInfo());
d = cae.getNodeDecisions().values().iterator().next(); explanation = cae.getNodeExplanations().values().iterator().next();
d = explanation.getDecision();
fd = explanation.getFinalDecision();
storeCopy = explanation.getStoreCopy();
finalExplanation = explanation.getFinalExplanation();
assertNotNull("should have a decision", d); assertNotNull("should have a decision", d);
assertEquals(Decision.Type.NO, d.type()); assertEquals(Decision.Type.NO, d.type());
assertEquals(ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED, fd);
assertEquals(ClusterAllocationExplanation.StoreCopy.AVAILABLE, storeCopy);
assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id")); assertTrue(d.toString(), d.toString().contains("NO(the shard cannot be allocated on the same node id"));
assertTrue(d instanceof Decision.Multi); assertTrue(d instanceof Decision.Multi);
md = (Decision.Multi) d; md = (Decision.Multi) d;
ssd = md.getDecisions().get(0); ssd = md.getDecisions().get(0);
assertEquals(Decision.Type.NO, ssd.type()); assertEquals(Decision.Type.NO, ssd.type());
assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id")); assertTrue(ssd.toString(), ssd.toString().contains("NO(the shard cannot be allocated on the same node id"));
weight = cae.getNodeWeights().values().iterator().next(); weight = explanation.getWeight();
assertNotNull("should have a weight", weight); assertNotNull("should have a weight", weight);
resp = client().admin().cluster().prepareAllocationExplain().useAnyUnassignedShard().get(); resp = client().admin().cluster().prepareAllocationExplain().useAnyUnassignedShard().get();

View File

@ -19,17 +19,36 @@
package org.elasticsearch.action.admin.cluster.allocation; package org.elasticsearch.action.admin.cluster.allocation;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -39,6 +58,131 @@ import static java.util.Collections.emptySet;
*/ */
public final class ClusterAllocationExplanationTests extends ESTestCase { public final class ClusterAllocationExplanationTests extends ESTestCase {
private Index i = new Index("foo", "uuid");
private ShardRouting primaryShard = ShardRouting.newUnassigned(i, 0, null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
private ShardRouting replicaShard = ShardRouting.newUnassigned(i, 0, null, false,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
private IndexMetaData indexMetaData = IndexMetaData.builder("foo")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "uuid"))
.putActiveAllocationIds(0, new HashSet<String>(Arrays.asList("aid1", "aid2")))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
private DiscoveryNode node = new DiscoveryNode("node-0", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
private static Decision.Multi yesDecision = new Decision.Multi();
private static Decision.Multi noDecision = new Decision.Multi();
static {
yesDecision.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
noDecision.add(Decision.single(Decision.Type.NO, "no label", "no thanks"));
}
private NodeExplanation makeNodeExplanation(boolean primary, boolean isAssigned, boolean hasErr, boolean hasActiveId) {
Float nodeWeight = randomFloat();
Exception e = hasErr ? new ElasticsearchException("stuff's broke, yo") : null;
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, e);
String assignedNodeId;
if (isAssigned) {
assignedNodeId = "node-0";
} else {
assignedNodeId = "node-9";
}
Set<String> activeAllocationIds = new HashSet<>();
if (hasActiveId) {
activeAllocationIds.add("eggplant");
}
return TransportClusterAllocationExplainAction.calculateNodeExplanation(primary ? primaryShard : replicaShard,
indexMetaData, node, noDecision, nodeWeight, storeStatus, assignedNodeId, activeAllocationIds);
}
private void assertExplanations(NodeExplanation ne, String finalExplanation, ClusterAllocationExplanation.FinalDecision finalDecision,
ClusterAllocationExplanation.StoreCopy storeCopy) {
assertEquals(finalExplanation, ne.getFinalExplanation());
assertEquals(finalDecision, ne.getFinalDecision());
assertEquals(storeCopy, ne.getStoreCopy());
}
public void testDecisionAndExplanation() {
Exception e = new IOException("stuff's broke, yo");
Exception corruptE = new CorruptIndexException("stuff's corrupt, yo", "");
Float nodeWeight = randomFloat();
Set<String> activeAllocationIds = new HashSet<>();
activeAllocationIds.add("eggplant");
ShardRouting primaryStartedShard = ShardRouting.newUnassigned(i, 0, null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "foo"));
assertTrue(primaryStartedShard.allocatedPostIndexCreate(indexMetaData));
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, e);
NodeExplanation ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node,
yesDecision, nodeWeight, storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the copy of the shard cannot be read",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.IO_ERROR);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, yesDecision, nodeWeight,
null, "", activeAllocationIds);
assertExplanations(ne, "the shard can be assigned",
ClusterAllocationExplanation.FinalDecision.YES, ClusterAllocationExplanation.StoreCopy.NONE);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryStartedShard, indexMetaData, node, yesDecision,
nodeWeight, null, "", activeAllocationIds);
assertExplanations(ne, "there is no copy of the shard available",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.NONE);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, noDecision, nodeWeight,
null, "", activeAllocationIds);
assertExplanations(ne, "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.NONE);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, noDecision, nodeWeight,
storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.AVAILABLE);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, corruptE);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, yesDecision, nodeWeight,
storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the copy of the shard is corrupt",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.CORRUPT);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "banana",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, yesDecision, nodeWeight,
storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the shard can be assigned",
ClusterAllocationExplanation.FinalDecision.YES, ClusterAllocationExplanation.StoreCopy.STALE);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "banana",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryStartedShard, indexMetaData, node, yesDecision,
nodeWeight, storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the copy of the shard is stale, allocation ids do not match",
ClusterAllocationExplanation.FinalDecision.NO, ClusterAllocationExplanation.StoreCopy.STALE);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, yesDecision, nodeWeight,
storeStatus, "node-0", activeAllocationIds);
assertExplanations(ne, "the shard is already assigned to this node",
ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED, ClusterAllocationExplanation.StoreCopy.AVAILABLE);
storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node, yesDecision, nodeWeight,
storeStatus, "", activeAllocationIds);
assertExplanations(ne, "the shard can be assigned and the node contains a valid copy of the shard data",
ClusterAllocationExplanation.FinalDecision.YES, ClusterAllocationExplanation.StoreCopy.AVAILABLE);
}
public void testDecisionEquality() { public void testDecisionEquality() {
Decision.Multi d = new Decision.Multi(); Decision.Multi d = new Decision.Multi();
Decision.Multi d2 = new Decision.Multi(); Decision.Multi d2 = new Decision.Multi();
@ -53,21 +197,19 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testExplanationSerialization() throws Exception { public void testExplanationSerialization() throws Exception {
ShardId shard = new ShardId("test", "uuid", 0); ShardId shard = new ShardId("test", "uuid", 0);
Map<DiscoveryNode, Decision> nodeToDecisions = new HashMap<>();
Map<DiscoveryNode, Float> nodeToWeight = new HashMap<>();
for (int i = randomIntBetween(2, 5); i > 0; i--) {
DiscoveryNode dn = new DiscoveryNode("node-" + i, DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
Decision.Multi d = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
nodeToDecisions.put(dn, d);
nodeToWeight.put(dn, randomFloat());
}
long remainingDelay = randomIntBetween(0, 500); long remainingDelay = randomIntBetween(0, 500);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null, Map<DiscoveryNode, NodeExplanation> nodeExplanations = new HashMap<>(1);
nodeToDecisions, nodeToWeight, remainingDelay); Float nodeWeight = randomFloat();
Set<String> activeAllocationIds = new HashSet<>();
activeAllocationIds.add("eggplant");
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
NodeExplanation ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node,
yesDecision, nodeWeight, storeStatus, "", activeAllocationIds);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true,
"assignedNode", remainingDelay, null, nodeExplanations);
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
cae.writeTo(out); cae.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes()); StreamInput in = StreamInput.wrap(out.bytes());
@ -77,10 +219,45 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
assertTrue(cae2.isAssigned()); assertTrue(cae2.isAssigned());
assertEquals("assignedNode", cae2.getAssignedNodeId()); assertEquals("assignedNode", cae2.getAssignedNodeId());
assertNull(cae2.getUnassignedInfo()); assertNull(cae2.getUnassignedInfo());
for (Map.Entry<DiscoveryNode, Decision> entry : cae2.getNodeDecisions().entrySet()) { assertEquals(remainingDelay, cae2.getRemainingDelayMillis());
assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue()); for (Map.Entry<DiscoveryNode, NodeExplanation> entry : cae2.getNodeExplanations().entrySet()) {
DiscoveryNode node = entry.getKey();
NodeExplanation explanation = entry.getValue();
IndicesShardStoresResponse.StoreStatus status = explanation.getStoreStatus();
assertNotNull(explanation.getStoreStatus());
assertNotNull(explanation.getDecision());
assertEquals(nodeWeight, explanation.getWeight());
} }
assertEquals(nodeToWeight, cae2.getNodeWeights()); }
assertEquals(remainingDelay, cae2.getRemainingDelayNanos());
public void testExplanationToXContent() throws Exception {
ShardId shardId = new ShardId("foo", "uuid", 0);
long remainingDelay = 42;
Decision.Multi d = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
Float nodeWeight = 1.5f;
Set<String> allocationIds = new HashSet<>();
allocationIds.add("bar");
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(node, 42, "eggplant",
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, new ElasticsearchException("stuff's broke, yo"));
NodeExplanation ne = TransportClusterAllocationExplainAction.calculateNodeExplanation(primaryShard, indexMetaData, node,
d, nodeWeight, storeStatus, "node-0", allocationIds);
Map<DiscoveryNode, NodeExplanation> nodeExplanations = new HashMap<>(1);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shardId, true,
"assignedNode", remainingDelay, null, nodeExplanations);
XContentBuilder builder = XContentFactory.jsonBuilder();
cae.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertEquals("{\"shard\":{\"index\":\"foo\",\"index_uuid\":\"uuid\",\"id\":0,\"primary\":true},\"assigned\":true," +
"\"assigned_node_id\":\"assignedNode\",\"nodes\":{\"node-0\":{\"node_name\":\"\",\"node_attribute" +
"s\":{},\"store\":{\"shard_copy\":\"IO_ERROR\",\"store_exception\":\"ElasticsearchException[stuff" +
"'s broke, yo]\"},\"final_decision\":\"ALREADY_ASSIGNED\",\"final_explanation\":\"the shard is al" +
"ready assigned to this node\",\"weight\":1.5,\"decisions\":[{\"decider\":\"no label\",\"decision" +
"\":\"NO\",\"explanation\":\"because I said no\"},{\"decider\":\"yes label\",\"decision\":\"YES\"" +
",\"explanation\":\"yes please\"},{\"decider\":\"throttle label\",\"decision\":\"THROTTLE\",\"exp" +
"lanation\":\"wait a sec\"}]}}}",
builder.string());
} }
} }