Move remaining logic into Transport class

Also split NodeExplanation into its own class
This commit is contained in:
Lee Hinman 2016-04-26 10:36:48 -06:00
parent 08ac66f41e
commit 49c310691c
3 changed files with 251 additions and 222 deletions

View File

@ -54,33 +54,17 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
private final boolean primary;
private final String assignedNodeId;
private final UnassignedInfo unassignedInfo;
private final long remainingDelayNanos;
private final Set<String> activeAllocationIds;
private final long remainingDelayMillis;
private final Map<DiscoveryNode, NodeExplanation> nodeExplanations;
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
Map<DiscoveryNode, Float> nodeToWeight, long remainingDelayNanos,
List<IndicesShardStoresResponse.StoreStatus> shardStores, Set<String> activeAllocationIds) {
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long remainingDelayMillis,
UnassignedInfo unassignedInfo, Map<DiscoveryNode, NodeExplanation> nodeExplanations) {
this.shard = shard;
this.primary = primary;
this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo;
this.remainingDelayNanos = remainingDelayNanos;
this.activeAllocationIds = activeAllocationIds;
final Map<DiscoveryNode, Decision> nodeDecisions = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
final Map<DiscoveryNode, Float> nodeWeights = nodeToWeight == null ? Collections.emptyMap() : nodeToWeight;
assert nodeDecisions.size() == nodeWeights.size() : "decision and weight list should be the same size";
final Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> storeStatuses = calculateStoreStatuses(shardStores);
this.nodeExplanations = new HashMap<>(nodeDecisions.size());
for (Map.Entry<DiscoveryNode, Decision> entry : nodeDecisions.entrySet()) {
final DiscoveryNode node = entry.getKey();
final Decision decision = entry.getValue();
final NodeExplanation nodeExplanation = calculateNodeExplanation(node, decision, nodeWeights.get(node),
storeStatuses.get(node), assignedNodeId, activeAllocationIds);
nodeExplanations.put(node, nodeExplanation);
}
this.remainingDelayMillis = remainingDelayMillis;
this.nodeExplanations = nodeExplanations;
}
public ClusterAllocationExplanation(StreamInput in) throws IOException {
@ -88,14 +72,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
this.primary = in.readBoolean();
this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
this.remainingDelayNanos = in.readVLong();
int allocIdSize = in.readVInt();
Set<String> activeIds = new HashSet<>(allocIdSize);
for (int i = 0; i < allocIdSize; i++) {
activeIds.add(in.readString());
}
this.activeAllocationIds = activeIds;
this.remainingDelayMillis = in.readVLong();
int mapSize = in.readVInt();
Map<DiscoveryNode, NodeExplanation> nodeToExplanation = new HashMap<>(mapSize);
@ -112,11 +89,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
out.writeBoolean(this.isPrimary());
out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo());
out.writeVLong(remainingDelayNanos);
out.writeVInt(activeAllocationIds.size());
for (String id : activeAllocationIds) {
out.writeString(id);
}
out.writeVLong(remainingDelayMillis);
out.writeVInt(this.nodeExplanations.size());
for (NodeExplanation explanation : this.nodeExplanations.values()) {
@ -124,67 +97,6 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
}
}
private NodeExplanation calculateNodeExplanation(DiscoveryNode node,
Decision nodeDecision,
Float nodeWeight,
IndicesShardStoresResponse.StoreStatus storeStatus,
String assignedNodeId,
Set<String> activeAllocationIds) {
FinalDecision finalDecision;
StoreCopy storeCopy;
String finalExplanation;
if (node.getId().equals(assignedNodeId)) {
finalDecision = FinalDecision.ALREADY_ASSIGNED;
finalExplanation = "the shard is already assigned to this node";
} else if (nodeDecision.type() == Decision.Type.NO) {
finalDecision = FinalDecision.NO;
finalExplanation = "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision";
} else {
finalDecision = FinalDecision.YES;
finalExplanation = "the shard can be assigned";
}
if (storeStatus != null) {
final Throwable storeErr = storeStatus.getStoreException();
if (storeErr != null) {
finalDecision = FinalDecision.NO;
if (ExceptionsHelper.unwrapCause(storeErr) instanceof CorruptIndexException) {
storeCopy = StoreCopy.CORRUPT;
finalExplanation = "the copy of data in the shard store is corrupt";
} else {
storeCopy = StoreCopy.IO_ERROR;
finalExplanation = "there was an IO error reading from data in the shard store";
}
} else if (activeAllocationIds.isEmpty() || activeAllocationIds.contains(storeStatus.getAllocationId())) {
// If either we don't have allocation IDs, or they contain the store allocation id, show the allocation
// status
storeCopy = StoreCopy.AVAILABLE;
if (finalDecision != FinalDecision.NO) {
finalExplanation = "the shard can be assigned and the node contains a valid copy of the shard data";
}
} else {
// Otherwise, this is a stale copy of the data (allocation ids don't match)
storeCopy = StoreCopy.STALE;
finalExplanation = "the copy of the shard is stale, allocation ids do not match";
finalDecision = FinalDecision.NO;
}
} else {
// No copies of the data, so deciders are what influence the decision and explanation
storeCopy = StoreCopy.NONE;
}
return new NodeExplanation(node, nodeDecision, nodeWeight, storeStatus, finalDecision, finalExplanation, storeCopy);
}
private static Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> calculateStoreStatuses(
List<IndicesShardStoresResponse.StoreStatus> shardStores) {
Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeToStatus = new HashMap<>(shardStores.size());
for (IndicesShardStoresResponse.StoreStatus status : shardStores) {
nodeToStatus.put(status.getNode(), status);
}
return nodeToStatus;
}
/** Return the shard that the explanation is about */
public ShardId getShard() {
return this.shard;
@ -213,13 +125,8 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
}
/** Return the remaining allocation delay for this shard in nanoseconds */
public long getRemainingDelayNanos() {
return this.remainingDelayNanos;
}
/** Return a set of the active allocation ids for this shard */
public Set<String> getActiveAllocationIds() {
return this.activeAllocationIds;
public long getRemainingDelayMillis() {
return this.remainingDelayMillis;
}
/** Return a map of node to the explanation for that node */
@ -245,10 +152,8 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
builder.field("allocation_delay", TimeValue.timeValueNanos(delay));
builder.field("allocation_delay_ms", TimeValue.timeValueNanos(delay).millis());
builder.field("remaining_delay", TimeValue.timeValueNanos(remainingDelayNanos));
builder.field("remaining_delay_ms", TimeValue.timeValueNanos(remainingDelayNanos).millis());
builder.timeValueField("allocation_delay_ms", "allocation_delay", TimeValue.timeValueNanos(delay));
builder.timeValueField("remaining_delay_ms", "remaining_delay", TimeValue.timeValueMillis(remainingDelayMillis));
}
builder.startObject("nodes");
for (NodeExplanation explanation : nodeExplanations.values()) {
@ -260,116 +165,6 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
return builder;
}
/** The cluster allocation explanation for a single node */
public class NodeExplanation implements Writeable, ToXContent {
private final DiscoveryNode node;
private final Decision nodeDecision;
private final Float nodeWeight;
private final IndicesShardStoresResponse.StoreStatus storeStatus;
private final FinalDecision finalDecision;
private final String finalExplanation;
private final StoreCopy storeCopy;
public NodeExplanation(final DiscoveryNode node, final Decision nodeDecision, final Float nodeWeight,
final @Nullable IndicesShardStoresResponse.StoreStatus storeStatus,
final FinalDecision finalDecision, final String finalExplanation, final StoreCopy storeCopy) {
this.node = node;
this.nodeDecision = nodeDecision;
this.nodeWeight = nodeWeight;
this.storeStatus = storeStatus;
this.finalDecision = finalDecision;
this.finalExplanation = finalExplanation;
this.storeCopy = storeCopy;
}
public NodeExplanation(StreamInput in) throws IOException {
this.node = new DiscoveryNode(in);
this.nodeDecision = Decision.readFrom(in);
this.nodeWeight = in.readFloat();
if (in.readBoolean()) {
this.storeStatus = IndicesShardStoresResponse.StoreStatus.readStoreStatus(in);
} else {
this.storeStatus = null;
}
this.finalDecision = FinalDecision.readFrom(in);
this.finalExplanation = in.readString();
this.storeCopy = StoreCopy.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
Decision.writeTo(nodeDecision, out);
out.writeFloat(nodeWeight);
if (storeStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
storeStatus.writeTo(out);
}
finalDecision.writeTo(out);
out.writeString(finalExplanation);
storeCopy.writeTo(out);
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(node.getId()); {
builder.field("node_name", node.getName());
builder.startObject("node_attributes"); {
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
}
builder.endObject(); // end attributes
builder.startObject("store"); {
builder.field("shard_copy", storeCopy.toString());
if (storeStatus != null) {
final Throwable storeErr = storeStatus.getStoreException();
if (storeErr != null) {
builder.field("store_exception", ExceptionsHelper.detailedMessage(storeErr));
}
}
}
builder.endObject(); // end store
builder.field("final_decision", finalDecision.toString());
builder.field("final_explanation", finalExplanation.toString());
builder.field("weight", nodeWeight);
nodeDecision.toXContent(builder, params);
}
builder.endObject(); // end node <uuid>
return builder;
}
public DiscoveryNode getNode() {
return this.node;
}
public Decision getDecision() {
return this.nodeDecision;
}
public Float getWeight() {
return this.nodeWeight;
}
@Nullable
public IndicesShardStoresResponse.StoreStatus getStoreStatus() {
return this.storeStatus;
}
public FinalDecision getFinalDecision() {
return this.finalDecision;
}
public String getFinalExplanation() {
return this.finalExplanation;
}
public StoreCopy getStoreCopy() {
return this.storeCopy;
}
}
/** An Enum representing the final decision for a shard allocation on a node */
public enum FinalDecision {
// Yes, the shard can be assigned
@ -426,7 +221,9 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
// There was an error reading this node's copy of the data
IO_ERROR((byte) 3),
// The copy of the data on the node is stale
STALE((byte) 4);
STALE((byte) 4),
// It's unknown what the copy of the data is
UNKNOWN((byte) 5);
private final byte id;
@ -441,6 +238,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
case 2: return CORRUPT;
case 3: return IO_ERROR;
case 4: return STALE;
case 5: return UNKNOWN;
default:
throw new IllegalArgumentException("unknown id for store copy: [" + id + "]");
}
@ -454,6 +252,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
case 2: return "CORRUPT";
case 3: return "IO_ERROR";
case 4: return "STALE";
case 5: return "UNKNOWN";
default:
throw new IllegalArgumentException("unknown id for store copy: [" + id + "]");
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.allocation;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
/** The cluster allocation explanation for a single node */
public class NodeExplanation implements Writeable, ToXContent {
private final DiscoveryNode node;
private final Decision nodeDecision;
private final Float nodeWeight;
private final IndicesShardStoresResponse.StoreStatus storeStatus;
private final ClusterAllocationExplanation.FinalDecision finalDecision;
private final ClusterAllocationExplanation.StoreCopy storeCopy;
private final String finalExplanation;
public NodeExplanation(final DiscoveryNode node, final Decision nodeDecision, final Float nodeWeight,
final @Nullable IndicesShardStoresResponse.StoreStatus storeStatus,
final ClusterAllocationExplanation.FinalDecision finalDecision,
final String finalExplanation,
final ClusterAllocationExplanation.StoreCopy storeCopy) {
this.node = node;
this.nodeDecision = nodeDecision;
this.nodeWeight = nodeWeight;
this.storeStatus = storeStatus;
this.finalDecision = finalDecision;
this.finalExplanation = finalExplanation;
this.storeCopy = storeCopy;
}
public NodeExplanation(StreamInput in) throws IOException {
this.node = new DiscoveryNode(in);
this.nodeDecision = Decision.readFrom(in);
this.nodeWeight = in.readFloat();
if (in.readBoolean()) {
this.storeStatus = IndicesShardStoresResponse.StoreStatus.readStoreStatus(in);
} else {
this.storeStatus = null;
}
this.finalDecision = ClusterAllocationExplanation.FinalDecision.readFrom(in);
this.finalExplanation = in.readString();
this.storeCopy = ClusterAllocationExplanation.StoreCopy.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
Decision.writeTo(nodeDecision, out);
out.writeFloat(nodeWeight);
if (storeStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
storeStatus.writeTo(out);
}
finalDecision.writeTo(out);
out.writeString(finalExplanation);
storeCopy.writeTo(out);
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(node.getId()); {
builder.field("node_name", node.getName());
builder.startObject("node_attributes"); {
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
}
builder.endObject(); // end attributes
builder.startObject("store"); {
builder.field("shard_copy", storeCopy.toString());
if (storeStatus != null) {
final Throwable storeErr = storeStatus.getStoreException();
if (storeErr != null) {
builder.field("store_exception", ExceptionsHelper.detailedMessage(storeErr));
}
}
}
builder.endObject(); // end store
builder.field("final_decision", finalDecision.toString());
builder.field("final_explanation", finalExplanation.toString());
builder.field("weight", nodeWeight);
nodeDecision.toXContent(builder, params);
}
builder.endObject(); // end node <uuid>
return builder;
}
public DiscoveryNode getNode() {
return this.node;
}
public Decision getDecision() {
return this.nodeDecision;
}
public Float getWeight() {
return this.nodeWeight;
}
@Nullable
public IndicesShardStoresResponse.StoreStatus getStoreStatus() {
return this.storeStatus;
}
public ClusterAllocationExplanation.FinalDecision getFinalDecision() {
return this.finalDecision;
}
public String getFinalExplanation() {
return this.finalExplanation;
}
public ClusterAllocationExplanation.StoreCopy getStoreCopy() {
return this.storeCopy;
}
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.action.admin.cluster.allocation;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
@ -53,6 +55,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -60,6 +63,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
@ -125,6 +129,68 @@ public class TransportClusterAllocationExplainAction
}
}
/**
* Construct a {@code NodeExplanation} object for the given shard given all the metadata. This also attempts to construct the human
* readable FinalDecision and final explanation as part of the explanation.
*/
public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
IndexMetaData indexMetaData,
DiscoveryNode node,
Decision nodeDecision,
Float nodeWeight,
IndicesShardStoresResponse.StoreStatus storeStatus,
String assignedNodeId,
Set<String> activeAllocationIds) {
ClusterAllocationExplanation.FinalDecision finalDecision;
ClusterAllocationExplanation.StoreCopy storeCopy;
String finalExplanation;
if (node.getId().equals(assignedNodeId)) {
finalDecision = ClusterAllocationExplanation.FinalDecision.ALREADY_ASSIGNED;
finalExplanation = "the shard is already assigned to this node";
} else if (nodeDecision.type() == Decision.Type.NO) {
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
finalExplanation = "the shard cannot be assigned because one or more allocation decider returns a 'NO' decision";
} else {
finalDecision = ClusterAllocationExplanation.FinalDecision.YES;
finalExplanation = "the shard can be assigned";
}
if (storeStatus != null) {
final Throwable storeErr = storeStatus.getStoreException();
// The store error only influences the decision if the shard is primary and has not been allocated before
if (storeErr != null && shard.primary() && shard.allocatedPostIndexCreate(indexMetaData) == false) {
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
if (ExceptionsHelper.unwrapCause(storeErr) instanceof CorruptIndexException) {
storeCopy = ClusterAllocationExplanation.StoreCopy.CORRUPT;
finalExplanation = "the copy of data in the shard store is corrupt";
} else {
storeCopy = ClusterAllocationExplanation.StoreCopy.IO_ERROR;
finalExplanation = "there was an IO error reading from data in the shard store";
}
} else if (activeAllocationIds.isEmpty()) {
// The ids are only empty if dealing with a legacy index
// TODO: fetch the shard state versions and display here?
storeCopy = ClusterAllocationExplanation.StoreCopy.UNKNOWN;
} else if (activeAllocationIds.contains(storeStatus.getAllocationId())) {
storeCopy = ClusterAllocationExplanation.StoreCopy.AVAILABLE;
if (finalDecision == ClusterAllocationExplanation.FinalDecision.YES) {
finalExplanation = "the shard can be assigned and the node contains a valid copy of the shard data";
}
} else {
// Otherwise, this is a stale copy of the data (allocation ids don't match)
storeCopy = ClusterAllocationExplanation.StoreCopy.STALE;
finalExplanation = "the copy of the shard is stale, allocation ids do not match";
finalDecision = ClusterAllocationExplanation.FinalDecision.NO;
}
} else {
// No copies of the data, so deciders are what influence the decision and explanation
storeCopy = ClusterAllocationExplanation.StoreCopy.NONE;
}
return new NodeExplanation(node, nodeDecision, nodeWeight, storeStatus, finalDecision, finalExplanation, storeCopy);
}
/**
* For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code
* includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions.
@ -147,16 +213,35 @@ public class TransportClusterAllocationExplainAction
nodeToDecision.put(discoNode, d);
}
}
long remainingDelayNanos = 0;
long remainingDelayMillis = 0;
final MetaData metadata = allocation.metaData();
final IndexMetaData indexMetaData = metadata.index(shard.index());
if (ui != null) {
final Settings indexSettings = indexMetaData.getSettings();
remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
shardAllocator.weighShard(allocation, shard), remainingDelayNanos, shardStores,
indexMetaData.activeAllocationIds(shard.getId()));
// Calculate weights for each of the nodes
Map<DiscoveryNode, Float> weights = shardAllocator.weighShard(allocation, shard);
Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeToStatus = new HashMap<>(shardStores.size());
for (IndicesShardStoresResponse.StoreStatus status : shardStores) {
nodeToStatus.put(status.getNode(), status);
}
Map<DiscoveryNode, NodeExplanation> explanations = new HashMap<>(shardStores.size());
for (Map.Entry<DiscoveryNode, Decision> entry : nodeToDecision.entrySet()) {
DiscoveryNode node = entry.getKey();
Decision decision = entry.getValue();
Float weight = weights.get(node);
IndicesShardStoresResponse.StoreStatus storeStatus = nodeToStatus.get(node);
NodeExplanation nodeExplanation = calculateNodeExplanation(shard, indexMetaData, node, decision, weight,
storeStatus, shard.currentNodeId(), indexMetaData.activeAllocationIds(shard.getId()));
explanations.put(node, nodeExplanation);
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(),
shard.currentNodeId(), remainingDelayMillis, ui, explanations);
}
@Override