Separates decision making from decision application in BalancedShardsAllocator (#20634)
Refactors the BalancedShardsAllocator to create a method that provides an allocation decision for allocating a single unassigned shard or a single started shard that can no longer remain on its current node. Having a separate method that provides a detailed decision on the allocation of a single shard will enable the cluster allocation explain API to directly invoke these methods to provide allocation explanations.
This commit is contained in:
parent
8329bf145a
commit
3d2e885157
|
@ -127,7 +127,7 @@ public class TransportClusterAllocationExplainAction
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a {@code NodeExplanation} object for the given shard given all the metadata. This also attempts to construct the human
|
* Construct a {@code WeightedDecision} object for the given shard given all the metadata. This also attempts to construct the human
|
||||||
* readable FinalDecision and final explanation as part of the explanation.
|
* readable FinalDecision and final explanation as part of the explanation.
|
||||||
*/
|
*/
|
||||||
public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
|
public static NodeExplanation calculateNodeExplanation(ShardRouting shard,
|
||||||
|
|
|
@ -185,15 +185,15 @@ public final class UnassignedInfo implements ToXContent, Writeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AllocationStatus fromDecision(Decision decision) {
|
public static AllocationStatus fromDecision(Decision.Type decision) {
|
||||||
Objects.requireNonNull(decision);
|
Objects.requireNonNull(decision);
|
||||||
switch (decision.type()) {
|
switch (decision) {
|
||||||
case NO:
|
case NO:
|
||||||
return DECIDERS_NO;
|
return DECIDERS_NO;
|
||||||
case THROTTLE:
|
case THROTTLE:
|
||||||
return DECIDERS_THROTTLED;
|
return DECIDERS_THROTTLED;
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("no allocation attempt from decision[" + decision.type() + "]");
|
throw new IllegalArgumentException("no allocation attempt from decision[" + decision + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ import java.util.Objects;
|
||||||
public class ShardAllocationDecision {
|
public class ShardAllocationDecision {
|
||||||
/** a constant representing a shard decision where no decision was taken */
|
/** a constant representing a shard decision where no decision was taken */
|
||||||
public static final ShardAllocationDecision DECISION_NOT_TAKEN =
|
public static final ShardAllocationDecision DECISION_NOT_TAKEN =
|
||||||
new ShardAllocationDecision(null, null, null, null, null, null);
|
new ShardAllocationDecision(null, null, null, null, null, null, null);
|
||||||
/**
|
/**
|
||||||
* a map of cached common no/throttle decisions that don't need explanations,
|
* a map of cached common no/throttle decisions that don't need explanations,
|
||||||
* this helps prevent unnecessary object allocations for the non-explain API case
|
* this helps prevent unnecessary object allocations for the non-explain API case
|
||||||
|
@ -44,15 +44,15 @@ public class ShardAllocationDecision {
|
||||||
static {
|
static {
|
||||||
Map<AllocationStatus, ShardAllocationDecision> cachedDecisions = new HashMap<>();
|
Map<AllocationStatus, ShardAllocationDecision> cachedDecisions = new HashMap<>();
|
||||||
cachedDecisions.put(AllocationStatus.FETCHING_SHARD_DATA,
|
cachedDecisions.put(AllocationStatus.FETCHING_SHARD_DATA,
|
||||||
new ShardAllocationDecision(Type.NO, AllocationStatus.FETCHING_SHARD_DATA, null, null, null, null));
|
new ShardAllocationDecision(Type.NO, AllocationStatus.FETCHING_SHARD_DATA, null, null, null, null, null));
|
||||||
cachedDecisions.put(AllocationStatus.NO_VALID_SHARD_COPY,
|
cachedDecisions.put(AllocationStatus.NO_VALID_SHARD_COPY,
|
||||||
new ShardAllocationDecision(Type.NO, AllocationStatus.NO_VALID_SHARD_COPY, null, null, null, null));
|
new ShardAllocationDecision(Type.NO, AllocationStatus.NO_VALID_SHARD_COPY, null, null, null, null, null));
|
||||||
cachedDecisions.put(AllocationStatus.DECIDERS_NO,
|
cachedDecisions.put(AllocationStatus.DECIDERS_NO,
|
||||||
new ShardAllocationDecision(Type.NO, AllocationStatus.DECIDERS_NO, null, null, null, null));
|
new ShardAllocationDecision(Type.NO, AllocationStatus.DECIDERS_NO, null, null, null, null, null));
|
||||||
cachedDecisions.put(AllocationStatus.DECIDERS_THROTTLED,
|
cachedDecisions.put(AllocationStatus.DECIDERS_THROTTLED,
|
||||||
new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, null, null, null, null));
|
new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, null, null, null, null, null));
|
||||||
cachedDecisions.put(AllocationStatus.DELAYED_ALLOCATION,
|
cachedDecisions.put(AllocationStatus.DELAYED_ALLOCATION,
|
||||||
new ShardAllocationDecision(Type.NO, AllocationStatus.DELAYED_ALLOCATION, null, null, null, null));
|
new ShardAllocationDecision(Type.NO, AllocationStatus.DELAYED_ALLOCATION, null, null, null, null, null));
|
||||||
CACHED_DECISIONS = Collections.unmodifiableMap(cachedDecisions);
|
CACHED_DECISIONS = Collections.unmodifiableMap(cachedDecisions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,14 +67,17 @@ public class ShardAllocationDecision {
|
||||||
@Nullable
|
@Nullable
|
||||||
private final String allocationId;
|
private final String allocationId;
|
||||||
@Nullable
|
@Nullable
|
||||||
private final Map<String, Decision> nodeDecisions;
|
private final Map<String, WeightedDecision> nodeDecisions;
|
||||||
|
@Nullable
|
||||||
|
private final Decision shardDecision;
|
||||||
|
|
||||||
private ShardAllocationDecision(Type finalDecision,
|
private ShardAllocationDecision(Type finalDecision,
|
||||||
AllocationStatus allocationStatus,
|
AllocationStatus allocationStatus,
|
||||||
String finalExplanation,
|
String finalExplanation,
|
||||||
String assignedNodeId,
|
String assignedNodeId,
|
||||||
String allocationId,
|
String allocationId,
|
||||||
Map<String, Decision> nodeDecisions) {
|
Map<String, WeightedDecision> nodeDecisions,
|
||||||
|
Decision shardDecision) {
|
||||||
assert assignedNodeId != null || finalDecision == null || finalDecision != Type.YES :
|
assert assignedNodeId != null || finalDecision == null || finalDecision != Type.YES :
|
||||||
"a yes decision must have a node to assign the shard to";
|
"a yes decision must have a node to assign the shard to";
|
||||||
assert allocationStatus != null || finalDecision == null || finalDecision == Type.YES :
|
assert allocationStatus != null || finalDecision == null || finalDecision == Type.YES :
|
||||||
|
@ -87,6 +90,18 @@ public class ShardAllocationDecision {
|
||||||
this.assignedNodeId = assignedNodeId;
|
this.assignedNodeId = assignedNodeId;
|
||||||
this.allocationId = allocationId;
|
this.allocationId = allocationId;
|
||||||
this.nodeDecisions = nodeDecisions != null ? Collections.unmodifiableMap(nodeDecisions) : null;
|
this.nodeDecisions = nodeDecisions != null ? Collections.unmodifiableMap(nodeDecisions) : null;
|
||||||
|
this.shardDecision = shardDecision;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a NO decision with the given shard-level decision and explanation (if in explain mode).
|
||||||
|
*/
|
||||||
|
public static ShardAllocationDecision no(Decision shardDecision, @Nullable String explanation) {
|
||||||
|
if (explanation != null) {
|
||||||
|
return new ShardAllocationDecision(Type.NO, AllocationStatus.DECIDERS_NO, explanation, null, null, null, shardDecision);
|
||||||
|
} else {
|
||||||
|
return getCachedDecision(AllocationStatus.DECIDERS_NO);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,7 +119,7 @@ public class ShardAllocationDecision {
|
||||||
@Nullable Map<String, Decision> nodeDecisions) {
|
@Nullable Map<String, Decision> nodeDecisions) {
|
||||||
Objects.requireNonNull(allocationStatus, "allocationStatus must not be null");
|
Objects.requireNonNull(allocationStatus, "allocationStatus must not be null");
|
||||||
if (explanation != null) {
|
if (explanation != null) {
|
||||||
return new ShardAllocationDecision(Type.NO, allocationStatus, explanation, null, null, nodeDecisions);
|
return new ShardAllocationDecision(Type.NO, allocationStatus, explanation, null, null, asExplanations(nodeDecisions), null);
|
||||||
} else {
|
} else {
|
||||||
return getCachedDecision(allocationStatus);
|
return getCachedDecision(allocationStatus);
|
||||||
}
|
}
|
||||||
|
@ -116,7 +131,8 @@ public class ShardAllocationDecision {
|
||||||
*/
|
*/
|
||||||
public static ShardAllocationDecision throttle(@Nullable String explanation, @Nullable Map<String, Decision> nodeDecisions) {
|
public static ShardAllocationDecision throttle(@Nullable String explanation, @Nullable Map<String, Decision> nodeDecisions) {
|
||||||
if (explanation != null) {
|
if (explanation != null) {
|
||||||
return new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, explanation, null, null, nodeDecisions);
|
return new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, explanation, null, null,
|
||||||
|
asExplanations(nodeDecisions), null);
|
||||||
} else {
|
} else {
|
||||||
return getCachedDecision(AllocationStatus.DECIDERS_THROTTLED);
|
return getCachedDecision(AllocationStatus.DECIDERS_THROTTLED);
|
||||||
}
|
}
|
||||||
|
@ -130,7 +146,29 @@ public class ShardAllocationDecision {
|
||||||
public static ShardAllocationDecision yes(String assignedNodeId, @Nullable String explanation, @Nullable String allocationId,
|
public static ShardAllocationDecision yes(String assignedNodeId, @Nullable String explanation, @Nullable String allocationId,
|
||||||
@Nullable Map<String, Decision> nodeDecisions) {
|
@Nullable Map<String, Decision> nodeDecisions) {
|
||||||
Objects.requireNonNull(assignedNodeId, "assignedNodeId must not be null");
|
Objects.requireNonNull(assignedNodeId, "assignedNodeId must not be null");
|
||||||
return new ShardAllocationDecision(Type.YES, null, explanation, assignedNodeId, allocationId, nodeDecisions);
|
return new ShardAllocationDecision(Type.YES, null, explanation, assignedNodeId, allocationId, asExplanations(nodeDecisions), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a {@link ShardAllocationDecision} from the given {@link Decision} and the assigned node, if any.
|
||||||
|
*/
|
||||||
|
public static ShardAllocationDecision fromDecision(Decision decision, @Nullable String assignedNodeId, boolean explain,
|
||||||
|
@Nullable Map<String, WeightedDecision> nodeDecisions) {
|
||||||
|
final Type decisionType = decision.type();
|
||||||
|
AllocationStatus allocationStatus = decisionType != Type.YES ? AllocationStatus.fromDecision(decisionType) : null;
|
||||||
|
String explanation = null;
|
||||||
|
if (explain) {
|
||||||
|
if (decision.type() == Type.YES) {
|
||||||
|
assert assignedNodeId != null;
|
||||||
|
explanation = "shard assigned to node [" + assignedNodeId + "]";
|
||||||
|
} else if (decision.type() == Type.THROTTLE) {
|
||||||
|
assert assignedNodeId != null;
|
||||||
|
explanation = "shard assignment throttled on node [" + assignedNodeId + "]";
|
||||||
|
} else {
|
||||||
|
explanation = "shard cannot be assigned to any node in the cluster";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ShardAllocationDecision(decisionType, allocationStatus, explanation, assignedNodeId, null, nodeDecisions, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ShardAllocationDecision getCachedDecision(AllocationStatus allocationStatus) {
|
private static ShardAllocationDecision getCachedDecision(AllocationStatus allocationStatus) {
|
||||||
|
@ -138,6 +176,17 @@ public class ShardAllocationDecision {
|
||||||
return Objects.requireNonNull(decision, "precomputed decision not found for " + allocationStatus);
|
return Objects.requireNonNull(decision, "precomputed decision not found for " + allocationStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Map<String, WeightedDecision> asExplanations(Map<String, Decision> decisionMap) {
|
||||||
|
if (decisionMap != null) {
|
||||||
|
Map<String, WeightedDecision> explanationMap = new HashMap<>();
|
||||||
|
for (Map.Entry<String, Decision> entry : decisionMap.entrySet()) {
|
||||||
|
explanationMap.put(entry.getKey(), new WeightedDecision(entry.getValue(), Float.POSITIVE_INFINITY));
|
||||||
|
}
|
||||||
|
return explanationMap;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns <code>true</code> if a decision was taken by the allocator, {@code false} otherwise.
|
* Returns <code>true</code> if a decision was taken by the allocator, {@code false} otherwise.
|
||||||
* If no decision was taken, then the rest of the fields in this object are meaningless and return {@code null}.
|
* If no decision was taken, then the rest of the fields in this object are meaningless and return {@code null}.
|
||||||
|
@ -151,7 +200,7 @@ public class ShardAllocationDecision {
|
||||||
* This value can only be {@code null} if {@link #isDecisionTaken()} returns {@code false}.
|
* This value can only be {@code null} if {@link #isDecisionTaken()} returns {@code false}.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Type getFinalDecision() {
|
public Type getFinalDecisionType() {
|
||||||
return finalDecision;
|
return finalDecision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +226,7 @@ public class ShardAllocationDecision {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecision()}.
|
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecisionType()}.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public String getFinalExplanation() {
|
public String getFinalExplanation() {
|
||||||
|
@ -185,7 +234,7 @@ public class ShardAllocationDecision {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the node id that the allocator will assign the shard to, unless {@link #getFinalDecision()} returns
|
* Get the node id that the allocator will assign the shard to, unless {@link #getFinalDecisionType()} returns
|
||||||
* a value other than {@link Decision.Type#YES}, in which case this returns {@code null}.
|
* a value other than {@link Decision.Type#YES}, in which case this returns {@code null}.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -206,11 +255,74 @@ public class ShardAllocationDecision {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the individual node-level decisions that went into making the final decision as represented by
|
* Gets the individual node-level decisions that went into making the final decision as represented by
|
||||||
* {@link #getFinalDecision()}. The map that is returned has the node id as the key and a {@link Decision}
|
* {@link #getFinalDecisionType()}. The map that is returned has the node id as the key and a {@link Decision}
|
||||||
* as the decision for the given node.
|
* as the decision for the given node.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Map<String, Decision> getNodeDecisions() {
|
public Map<String, WeightedDecision> getNodeDecisions() {
|
||||||
return nodeDecisions;
|
return nodeDecisions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the decision on allocating a shard, without examining any specific nodes to allocate to
|
||||||
|
* (e.g. a replica can never be allocated if the primary is not allocated, so this is a shard-level
|
||||||
|
* decision, not having taken any node into account).
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public Decision getShardDecision() {
|
||||||
|
return shardDecision;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents the shard allocation decision for a single node,
|
||||||
|
* including the {@link Decision} whether to allocate to the node and the
|
||||||
|
* weight assigned to the node for the shard in question.
|
||||||
|
*/
|
||||||
|
public static final class WeightedDecision {
|
||||||
|
|
||||||
|
private final Decision decision;
|
||||||
|
private final float weight;
|
||||||
|
|
||||||
|
public WeightedDecision(Decision decision) {
|
||||||
|
this.decision = Objects.requireNonNull(decision);
|
||||||
|
this.weight = Float.POSITIVE_INFINITY;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WeightedDecision(Decision decision, float weight) {
|
||||||
|
this.decision = Objects.requireNonNull(decision);
|
||||||
|
this.weight = Objects.requireNonNull(weight);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The decision for allocating to the node.
|
||||||
|
*/
|
||||||
|
public Decision getDecision() {
|
||||||
|
return decision;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The calculated weight for allocating a shard to the node. A value of {@link Float#POSITIVE_INFINITY}
|
||||||
|
* means the weight was not calculated or factored into the decision.
|
||||||
|
*/
|
||||||
|
public float getWeight() {
|
||||||
|
return weight;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (this == other) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (other == null || getClass() != other.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
WeightedDecision that = (WeightedDecision) other;
|
||||||
|
return decision.equals(that.decision) && Float.compare(weight, that.weight) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(decision, weight);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,10 +31,13 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision.WeightedDecision;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -51,6 +54,7 @@ import java.util.HashSet;
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
@ -503,24 +507,50 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
// offloading the shards.
|
// offloading the shards.
|
||||||
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
|
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
|
||||||
ShardRouting shardRouting = it.next();
|
ShardRouting shardRouting = it.next();
|
||||||
// we can only move started shards...
|
final MoveDecision moveDecision = makeMoveDecision(shardRouting);
|
||||||
if (shardRouting.started()) {
|
if (moveDecision.move()) {
|
||||||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
||||||
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
final ModelNode targetNode = nodes.get(moveDecision.getAssignedNodeId());
|
||||||
RoutingNode routingNode = sourceNode.getRoutingNode();
|
sourceNode.removeShard(shardRouting);
|
||||||
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, targetNode.getNodeId(),
|
||||||
if (decision.type() == Decision.Type.NO) {
|
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
|
||||||
moveShard(shardRouting, sourceNode, routingNode);
|
targetNode.addShard(relocatingShards.v2());
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
|
||||||
}
|
}
|
||||||
|
} else if (moveDecision.cannotRemain()) {
|
||||||
|
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move started shard to the minimal eligible node with respect to the weight function
|
* Makes a decision on whether to move a started shard to another node. The following rules apply
|
||||||
|
* to the {@link MoveDecision} return object:
|
||||||
|
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
|
||||||
|
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
|
||||||
|
* {@link MoveDecision#canRemainDecision} will have a decision type of YES. All other fields in the object will be null.
|
||||||
|
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#finalDecision} will be populated
|
||||||
|
* with the decision of moving to another node. If {@link MoveDecision#finalDecision} returns YES, then
|
||||||
|
* {@link MoveDecision#assignedNodeId} will return a non-null value, otherwise the assignedNodeId will be null.
|
||||||
|
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
|
||||||
|
* {@link MoveDecision#finalExplanation} and {@link MoveDecision#nodeDecisions} will have non-null values.
|
||||||
*/
|
*/
|
||||||
private void moveShard(ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
|
public MoveDecision makeMoveDecision(final ShardRouting shardRouting) {
|
||||||
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
if (shardRouting.started() == false) {
|
||||||
|
// we can only move started shards
|
||||||
|
return MoveDecision.DECISION_NOT_TAKEN;
|
||||||
|
}
|
||||||
|
|
||||||
|
final boolean explain = allocation.debugDecision();
|
||||||
|
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
||||||
|
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
||||||
|
RoutingNode routingNode = sourceNode.getRoutingNode();
|
||||||
|
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
||||||
|
if (canRemain.type() != Decision.Type.NO) {
|
||||||
|
return MoveDecision.stay(canRemain, explain);
|
||||||
|
}
|
||||||
|
|
||||||
sorter.reset(shardRouting.getIndexName());
|
sorter.reset(shardRouting.getIndexName());
|
||||||
/*
|
/*
|
||||||
* the sorter holds the minimum weight node first for the shards index.
|
* the sorter holds the minimum weight node first for the shards index.
|
||||||
|
@ -528,23 +558,34 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||||
* allocate on the minimal eligible node.
|
* allocate on the minimal eligible node.
|
||||||
*/
|
*/
|
||||||
|
Type bestDecision = Type.NO;
|
||||||
|
RoutingNode targetNode = null;
|
||||||
|
final Map<String, WeightedDecision> nodeExplanationMap = explain ? new HashMap<>() : null;
|
||||||
for (ModelNode currentNode : sorter.modelNodes) {
|
for (ModelNode currentNode : sorter.modelNodes) {
|
||||||
if (currentNode != sourceNode) {
|
if (currentNode != sourceNode) {
|
||||||
RoutingNode target = currentNode.getRoutingNode();
|
RoutingNode target = currentNode.getRoutingNode();
|
||||||
// don't use canRebalance as we want hard filtering rules to apply. See #17698
|
// don't use canRebalance as we want hard filtering rules to apply. See #17698
|
||||||
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
||||||
if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
if (explain) {
|
||||||
sourceNode.removeShard(shardRouting);
|
nodeExplanationMap.put(currentNode.getNodeId(), new WeightedDecision(allocationDecision, sorter.weight(currentNode)));
|
||||||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
|
|
||||||
currentNode.addShard(relocatingShards.v2());
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
|
||||||
}
|
}
|
||||||
return;
|
// TODO maybe we can respect throttling here too?
|
||||||
|
if (allocationDecision.type().higherThan(bestDecision)) {
|
||||||
|
bestDecision = allocationDecision.type();
|
||||||
|
if (bestDecision == Type.YES) {
|
||||||
|
targetNode = target;
|
||||||
|
if (explain == false) {
|
||||||
|
// we are not in explain mode and already have a YES decision on the best weighted node,
|
||||||
|
// no need to continue iterating
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return MoveDecision.decision(canRemain, bestDecision, explain, shardRouting.currentNodeId(),
|
||||||
|
targetNode != null ? targetNode.nodeId() : null, nodeExplanationMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -627,11 +668,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
do {
|
do {
|
||||||
for (int i = 0; i < primaryLength; i++) {
|
for (int i = 0; i < primaryLength; i++) {
|
||||||
ShardRouting shard = primary[i];
|
ShardRouting shard = primary[i];
|
||||||
Tuple<Decision, ModelNode> allocationDecision = allocateUnassignedShard(shard, throttledNodes);
|
ShardAllocationDecision allocationDecision = decideAllocateUnassigned(shard, throttledNodes);
|
||||||
final Decision decision = allocationDecision.v1();
|
final Type decisionType = allocationDecision.getFinalDecisionType();
|
||||||
final ModelNode minNode = allocationDecision.v2();
|
final String assignedNodeId = allocationDecision.getAssignedNodeId();
|
||||||
|
final ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null;
|
||||||
|
|
||||||
if (decision.type() == Type.YES) {
|
if (decisionType == Type.YES) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
|
||||||
}
|
}
|
||||||
|
@ -650,12 +692,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
} else {
|
} else {
|
||||||
// did *not* receive a YES decision
|
// did *not* receive a YES decision
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("No eligible node found to assign shard [{}] decision [{}]", shard, decision.type());
|
logger.trace("No eligible node found to assign shard [{}] decision [{}]", shard, decisionType);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minNode != null) {
|
if (minNode != null) {
|
||||||
// throttle decision scenario
|
// throttle decision scenario
|
||||||
assert decision.type() == Type.THROTTLE;
|
assert decisionType == Type.THROTTLE;
|
||||||
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
|
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
|
||||||
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||||
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
|
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
|
||||||
|
@ -663,19 +705,19 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
|
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
|
||||||
if (nodeLevelDecision != Type.YES) {
|
if (nodeLevelDecision != Type.YES) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
|
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decisionType);
|
||||||
}
|
}
|
||||||
assert nodeLevelDecision == Type.NO;
|
assert nodeLevelDecision == Type.NO;
|
||||||
throttledNodes.add(minNode);
|
throttledNodes.add(minNode);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert decision.type() == Type.NO;
|
assert decisionType == Type.NO;
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("No Node found to assign shard [{}]", shard);
|
logger.trace("No Node found to assign shard [{}]", shard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decision);
|
UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decisionType);
|
||||||
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
|
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
|
||||||
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
|
||||||
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
|
||||||
|
@ -699,33 +741,48 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
* {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned
|
* {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned
|
||||||
* is of type {@link Type#NO}, then the assigned node will be null.
|
* is of type {@link Type#NO}, then the assigned node will be null.
|
||||||
*/
|
*/
|
||||||
private Tuple<Decision, ModelNode> allocateUnassignedShard(final ShardRouting shard, final Set<ModelNode> throttledNodes) {
|
private ShardAllocationDecision decideAllocateUnassigned(final ShardRouting shard, final Set<ModelNode> throttledNodes) {
|
||||||
assert !shard.assignedToNode() : "not an unassigned shard: " + shard;
|
if (shard.assignedToNode()) {
|
||||||
if (allocation.deciders().canAllocate(shard, allocation).type() == Type.NO) {
|
// we only make decisions for unassigned shards here
|
||||||
|
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||||
|
}
|
||||||
|
|
||||||
|
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
|
||||||
|
if (shardLevelDecision.type() == Type.NO) {
|
||||||
// NO decision for allocating the shard, irrespective of any particular node, so exit early
|
// NO decision for allocating the shard, irrespective of any particular node, so exit early
|
||||||
return Tuple.tuple(Decision.NO, null);
|
return ShardAllocationDecision.no(shardLevelDecision, explain("cannot allocate shard in its current state"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* find an node with minimal weight we can allocate on*/
|
/* find an node with minimal weight we can allocate on*/
|
||||||
float minWeight = Float.POSITIVE_INFINITY;
|
float minWeight = Float.POSITIVE_INFINITY;
|
||||||
ModelNode minNode = null;
|
ModelNode minNode = null;
|
||||||
Decision decision = null;
|
Decision decision = null;
|
||||||
if (throttledNodes.size() < nodes.size()) {
|
final boolean explain = allocation.debugDecision();
|
||||||
|
if (throttledNodes.size() >= nodes.size() && explain == false) {
|
||||||
|
// all nodes are throttled, so we know we won't be able to allocate this round,
|
||||||
|
// so if we are not in explain mode, short circuit
|
||||||
|
return ShardAllocationDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, null);
|
||||||
|
}
|
||||||
/* Don't iterate over an identity hashset here the
|
/* Don't iterate over an identity hashset here the
|
||||||
* iteration order is different for each run and makes testing hard */
|
* iteration order is different for each run and makes testing hard */
|
||||||
|
Map<String, WeightedDecision> nodeExplanationMap = explain ? new HashMap<>() : null;
|
||||||
for (ModelNode node : nodes.values()) {
|
for (ModelNode node : nodes.values()) {
|
||||||
if (throttledNodes.contains(node)) {
|
if ((throttledNodes.contains(node) || node.containsShard(shard)) && explain == false) {
|
||||||
|
// decision is NO without needing to check anything further, so short circuit
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!node.containsShard(shard)) {
|
|
||||||
// simulate weight if we would add shard to node
|
// simulate weight if we would add shard to node
|
||||||
float currentWeight = weight.weightShardAdded(this, node, shard.getIndexName());
|
float currentWeight = weight.weightShardAdded(this, node, shard.getIndexName());
|
||||||
/*
|
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
|
||||||
* Unless the operation is not providing any gains we
|
if (currentWeight > minWeight && explain == false) {
|
||||||
* don't check deciders
|
continue;
|
||||||
*/
|
}
|
||||||
if (currentWeight <= minWeight) {
|
|
||||||
Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
|
Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
|
||||||
|
if (explain) {
|
||||||
|
nodeExplanationMap.put(node.getNodeId(), new WeightedDecision(currentDecision, currentWeight));
|
||||||
|
}
|
||||||
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
|
||||||
final boolean updateMinNode;
|
final boolean updateMinNode;
|
||||||
if (currentWeight == minWeight) {
|
if (currentWeight == minWeight) {
|
||||||
|
@ -761,14 +818,25 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (decision == null) {
|
if (decision == null) {
|
||||||
// decision was not set and a node was not assigned, so treat it as a NO decision
|
// decision was not set and a node was not assigned, so treat it as a NO decision
|
||||||
decision = Decision.NO;
|
decision = Decision.NO;
|
||||||
}
|
}
|
||||||
return Tuple.tuple(decision, minNode);
|
return ShardAllocationDecision.fromDecision(
|
||||||
|
decision,
|
||||||
|
minNode != null ? minNode.getNodeId() : null,
|
||||||
|
explain,
|
||||||
|
nodeExplanationMap
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// provide an explanation, if in explain mode
|
||||||
|
private String explain(String explanation) {
|
||||||
|
if (allocation.debugDecision()) {
|
||||||
|
return explanation;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1031,4 +1099,157 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
||||||
return weights[weights.length - 1] - weights[0];
|
return weights[weights.length - 1] - weights[0];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a decision to relocate a started shard from its current node.
|
||||||
|
*/
|
||||||
|
public abstract static class RelocationDecision {
|
||||||
|
@Nullable
|
||||||
|
private final Type finalDecision;
|
||||||
|
@Nullable
|
||||||
|
private final String finalExplanation;
|
||||||
|
@Nullable
|
||||||
|
private final String assignedNodeId;
|
||||||
|
@Nullable
|
||||||
|
private final Map<String, WeightedDecision> nodeDecisions;
|
||||||
|
|
||||||
|
protected RelocationDecision(Type finalDecision, String finalExplanation, String assignedNodeId,
|
||||||
|
Map<String, WeightedDecision> nodeDecisions) {
|
||||||
|
this.finalDecision = finalDecision;
|
||||||
|
this.finalExplanation = finalExplanation;
|
||||||
|
this.assignedNodeId = assignedNodeId;
|
||||||
|
this.nodeDecisions = nodeDecisions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if a decision was taken by the allocator, {@code false} otherwise.
|
||||||
|
* If no decision was taken, then the rest of the fields in this object are meaningless and return {@code null}.
|
||||||
|
*/
|
||||||
|
public boolean isDecisionTaken() {
|
||||||
|
return finalDecision != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the final decision made by the allocator on whether to assign the shard, and
|
||||||
|
* {@code null} if no decision was taken.
|
||||||
|
*/
|
||||||
|
public Type getFinalDecisionType() {
|
||||||
|
return finalDecision;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecisionType()}.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getFinalExplanation() {
|
||||||
|
return finalExplanation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the node id that the allocator will assign the shard to, unless {@link #getFinalDecisionType()} returns
|
||||||
|
* a value other than {@link Decision.Type#YES}, in which case this returns {@code null}.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getAssignedNodeId() {
|
||||||
|
return assignedNodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the individual node-level decisions that went into making the final decision as represented by
|
||||||
|
* {@link #getFinalDecisionType()}. The map that is returned has the node id as the key and a {@link WeightedDecision}.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public Map<String, WeightedDecision> getNodeDecisions() {
|
||||||
|
return nodeDecisions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a decision to move a started shard because it is no longer allowed to remain on its current node.
|
||||||
|
*/
|
||||||
|
public static final class MoveDecision extends RelocationDecision {
|
||||||
|
/** a constant representing no decision taken */
|
||||||
|
public static final MoveDecision DECISION_NOT_TAKEN = new MoveDecision(null, null, null, null, null);
|
||||||
|
/** cached decisions so we don't have to recreate objects for common decisions when not in explain mode. */
|
||||||
|
private static final MoveDecision CACHED_STAY_DECISION = new MoveDecision(Decision.YES, Type.NO, null, null, null);
|
||||||
|
private static final MoveDecision CACHED_CANNOT_MOVE_DECISION = new MoveDecision(Decision.NO, Type.NO, null, null, null);
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final Decision canRemainDecision;
|
||||||
|
|
||||||
|
private MoveDecision(Decision canRemainDecision, Type finalDecision, String finalExplanation,
|
||||||
|
String assignedNodeId, Map<String, WeightedDecision> nodeDecisions) {
|
||||||
|
super(finalDecision, finalExplanation, assignedNodeId, nodeDecisions);
|
||||||
|
this.canRemainDecision = canRemainDecision;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a move decision for the shard being able to remain on its current node, so not moving.
|
||||||
|
*/
|
||||||
|
public static MoveDecision stay(Decision canRemainDecision, boolean explain) {
|
||||||
|
assert canRemainDecision.type() != Type.NO;
|
||||||
|
if (explain) {
|
||||||
|
final String explanation;
|
||||||
|
if (explain) {
|
||||||
|
explanation = "shard is allowed to remain on its current node, so no reason to move";
|
||||||
|
} else {
|
||||||
|
explanation = null;
|
||||||
|
}
|
||||||
|
return new MoveDecision(Objects.requireNonNull(canRemainDecision), Type.NO, explanation, null, null);
|
||||||
|
} else {
|
||||||
|
return CACHED_STAY_DECISION;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a move decision for the shard not being able to remain on its current node.
|
||||||
|
*
|
||||||
|
* @param canRemainDecision the decision for whether the shard is allowed to remain on its current node
|
||||||
|
* @param finalDecision the decision of whether to move the shard to another node
|
||||||
|
* @param explain true if in explain mode
|
||||||
|
* @param currentNodeId the current node id where the shard is assigned
|
||||||
|
* @param assignedNodeId the node id for where the shard can move to
|
||||||
|
* @param nodeDecisions the node-level decisions that comprised the final decision, non-null iff explain is true
|
||||||
|
* @return the {@link MoveDecision} for moving the shard to another node
|
||||||
|
*/
|
||||||
|
public static MoveDecision decision(Decision canRemainDecision, Type finalDecision, boolean explain, String currentNodeId,
|
||||||
|
String assignedNodeId, Map<String, WeightedDecision> nodeDecisions) {
|
||||||
|
assert canRemainDecision != null;
|
||||||
|
assert canRemainDecision.type() != Type.YES : "create decision with MoveDecision#stay instead";
|
||||||
|
String finalExplanation = null;
|
||||||
|
if (explain) {
|
||||||
|
assert currentNodeId != null;
|
||||||
|
if (finalDecision == Type.YES) {
|
||||||
|
assert assignedNodeId != null;
|
||||||
|
finalExplanation = "shard cannot remain on node [" + currentNodeId + "], moving to node [" + assignedNodeId + "]";
|
||||||
|
} else if (finalDecision == Type.THROTTLE) {
|
||||||
|
finalExplanation = "shard cannot remain on node [" + currentNodeId + "], throttled on moving to another node";
|
||||||
|
} else {
|
||||||
|
finalExplanation = "shard cannot remain on node [" + currentNodeId + "], but cannot be assigned to any other node";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (finalExplanation == null && finalDecision == Type.NO) {
|
||||||
|
// the final decision is NO (no node to move the shard to) and we are not in explain mode, return a cached version
|
||||||
|
return CACHED_CANNOT_MOVE_DECISION;
|
||||||
|
} else {
|
||||||
|
assert ((assignedNodeId == null) == (finalDecision != Type.YES));
|
||||||
|
return new MoveDecision(canRemainDecision, finalDecision, finalExplanation, assignedNodeId, nodeDecisions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if the shard cannot remain on its current node and can be moved, returns {@code false} otherwise.
|
||||||
|
*/
|
||||||
|
public boolean move() {
|
||||||
|
return cannotRemain() && getFinalDecisionType() == Type.YES;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if the shard cannot remain on its current node.
|
||||||
|
*/
|
||||||
|
public boolean cannotRemain() {
|
||||||
|
return isDecisionTaken() && canRemainDecision.type() == Type.NO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,6 +138,18 @@ public abstract class Decision implements ToXContent {
|
||||||
throw new IllegalArgumentException("Invalid Type [" + type + "]");
|
throw new IllegalArgumentException("Invalid Type [" + type + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean higherThan(Type other) {
|
||||||
|
if (this == NO) {
|
||||||
|
return false;
|
||||||
|
} else if (other == NO) {
|
||||||
|
return true;
|
||||||
|
} else if (other == THROTTLE && this == YES) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -153,7 +153,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
||||||
Tuple<Decision, Map<String, Decision>> allocateDecision = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation, explain);
|
Tuple<Decision, Map<String, Decision>> allocateDecision = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation, explain);
|
||||||
if (allocateDecision.v1().type() != Decision.Type.YES) {
|
if (allocateDecision.v1().type() != Decision.Type.YES) {
|
||||||
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
|
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
|
||||||
return ShardAllocationDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.v1()),
|
return ShardAllocationDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.v1().type()),
|
||||||
explain ? "all nodes returned a " + allocateDecision.v1().type() + " decision for allocating the replica shard" : null,
|
explain ? "all nodes returned a " + allocateDecision.v1().type() + " decision for allocating the replica shard" : null,
|
||||||
allocateDecision.v2());
|
allocateDecision.v2());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision.WeightedDecision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.MoveDecision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for the {@link MoveDecision} class.
|
||||||
|
*/
|
||||||
|
public class MoveDecisionTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testCachedDecisions() {
|
||||||
|
// cached stay decision
|
||||||
|
MoveDecision stay1 = MoveDecision.stay(Decision.YES, false);
|
||||||
|
MoveDecision stay2 = MoveDecision.stay(Decision.YES, false);
|
||||||
|
assertSame(stay1, stay2); // not in explain mode, so should use cached decision
|
||||||
|
stay1 = MoveDecision.stay(Decision.YES, true);
|
||||||
|
stay2 = MoveDecision.stay(Decision.YES, true);
|
||||||
|
assertNotSame(stay1, stay2);
|
||||||
|
|
||||||
|
// cached cannot move decision
|
||||||
|
stay1 = MoveDecision.decision(Decision.NO, Type.NO, false, null, null, null);
|
||||||
|
stay2 = MoveDecision.decision(Decision.NO, Type.NO, false, null, null, null);
|
||||||
|
assertSame(stay1, stay2);
|
||||||
|
// final decision is YES, so shouldn't use cached decision
|
||||||
|
stay1 = MoveDecision.decision(Decision.NO, Type.YES, false, null, "node1", null);
|
||||||
|
stay2 = MoveDecision.decision(Decision.NO, Type.YES, false, null, "node1", null);
|
||||||
|
assertNotSame(stay1, stay2);
|
||||||
|
assertEquals(stay1.getAssignedNodeId(), stay2.getAssignedNodeId());
|
||||||
|
// final decision is NO, but in explain mode, so shouldn't use cached decision
|
||||||
|
stay1 = MoveDecision.decision(Decision.NO, Type.NO, true, "node1", null, null);
|
||||||
|
stay2 = MoveDecision.decision(Decision.NO, Type.NO, true, "node1", null, null);
|
||||||
|
assertNotSame(stay1, stay2);
|
||||||
|
assertSame(stay1.getFinalDecisionType(), stay2.getFinalDecisionType());
|
||||||
|
assertNotNull(stay1.getFinalExplanation());
|
||||||
|
assertEquals(stay1.getFinalExplanation(), stay2.getFinalExplanation());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testStayDecision() {
|
||||||
|
MoveDecision stay = MoveDecision.stay(Decision.YES, true);
|
||||||
|
assertFalse(stay.cannotRemain());
|
||||||
|
assertFalse(stay.move());
|
||||||
|
assertTrue(stay.isDecisionTaken());
|
||||||
|
assertNull(stay.getNodeDecisions());
|
||||||
|
assertNotNull(stay.getFinalExplanation());
|
||||||
|
assertEquals(Type.NO, stay.getFinalDecisionType());
|
||||||
|
|
||||||
|
stay = MoveDecision.stay(Decision.YES, false);
|
||||||
|
assertFalse(stay.cannotRemain());
|
||||||
|
assertFalse(stay.move());
|
||||||
|
assertTrue(stay.isDecisionTaken());
|
||||||
|
assertNull(stay.getNodeDecisions());
|
||||||
|
assertNull(stay.getFinalExplanation());
|
||||||
|
assertEquals(Type.NO, stay.getFinalDecisionType());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDecisionWithExplain() {
|
||||||
|
Map<String, WeightedDecision> nodeDecisions = new HashMap<>();
|
||||||
|
nodeDecisions.put("node1", new WeightedDecision(randomFrom(Decision.NO, Decision.THROTTLE, Decision.YES), randomFloat()));
|
||||||
|
nodeDecisions.put("node2", new WeightedDecision(randomFrom(Decision.NO, Decision.THROTTLE, Decision.YES), randomFloat()));
|
||||||
|
MoveDecision decision = MoveDecision.decision(Decision.NO, Type.NO, true, "node1", null, nodeDecisions);
|
||||||
|
assertNotNull(decision.getFinalDecisionType());
|
||||||
|
assertNotNull(decision.getFinalExplanation());
|
||||||
|
assertNotNull(decision.getNodeDecisions());
|
||||||
|
assertEquals(2, decision.getNodeDecisions().size());
|
||||||
|
|
||||||
|
decision = MoveDecision.decision(Decision.NO, Type.YES, true, "node1", "node2", null);
|
||||||
|
assertEquals("node2", decision.getAssignedNodeId());
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.cluster.routing.allocation;
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision.WeightedDecision;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for the {@link ShardAllocationDecision} class.
|
* Unit tests for the {@link ShardAllocationDecision} class.
|
||||||
|
@ -37,7 +39,7 @@ public class ShardAllocationDecisionTests extends ESTestCase {
|
||||||
public void testDecisionNotTaken() {
|
public void testDecisionNotTaken() {
|
||||||
ShardAllocationDecision shardAllocationDecision = ShardAllocationDecision.DECISION_NOT_TAKEN;
|
ShardAllocationDecision shardAllocationDecision = ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||||
assertFalse(shardAllocationDecision.isDecisionTaken());
|
assertFalse(shardAllocationDecision.isDecisionTaken());
|
||||||
assertNull(shardAllocationDecision.getFinalDecision());
|
assertNull(shardAllocationDecision.getFinalDecisionType());
|
||||||
assertNull(shardAllocationDecision.getAllocationStatus());
|
assertNull(shardAllocationDecision.getAllocationStatus());
|
||||||
assertNull(shardAllocationDecision.getAllocationId());
|
assertNull(shardAllocationDecision.getAllocationId());
|
||||||
assertNull(shardAllocationDecision.getAssignedNodeId());
|
assertNull(shardAllocationDecision.getAssignedNodeId());
|
||||||
|
@ -52,19 +54,21 @@ public class ShardAllocationDecisionTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
ShardAllocationDecision noDecision = ShardAllocationDecision.no(allocationStatus, "something is wrong");
|
ShardAllocationDecision noDecision = ShardAllocationDecision.no(allocationStatus, "something is wrong");
|
||||||
assertTrue(noDecision.isDecisionTaken());
|
assertTrue(noDecision.isDecisionTaken());
|
||||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision());
|
assertEquals(Decision.Type.NO, noDecision.getFinalDecisionType());
|
||||||
assertEquals(allocationStatus, noDecision.getAllocationStatus());
|
assertEquals(allocationStatus, noDecision.getAllocationStatus());
|
||||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||||
assertNull(noDecision.getNodeDecisions());
|
assertNull(noDecision.getNodeDecisions());
|
||||||
assertNull(noDecision.getAssignedNodeId());
|
assertNull(noDecision.getAssignedNodeId());
|
||||||
assertNull(noDecision.getAllocationId());
|
assertNull(noDecision.getAllocationId());
|
||||||
|
|
||||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
Map<String, ShardAllocationDecision.WeightedDecision> nodeDecisions = new HashMap<>();
|
||||||
nodeDecisions.put("node1", Decision.NO);
|
nodeDecisions.put("node1", new ShardAllocationDecision.WeightedDecision(Decision.NO));
|
||||||
nodeDecisions.put("node2", Decision.NO);
|
nodeDecisions.put("node2", new ShardAllocationDecision.WeightedDecision(Decision.NO));
|
||||||
noDecision = ShardAllocationDecision.no(AllocationStatus.DECIDERS_NO, "something is wrong", nodeDecisions);
|
noDecision = ShardAllocationDecision.no(AllocationStatus.DECIDERS_NO, "something is wrong",
|
||||||
|
nodeDecisions.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getDecision()))
|
||||||
|
);
|
||||||
assertTrue(noDecision.isDecisionTaken());
|
assertTrue(noDecision.isDecisionTaken());
|
||||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision());
|
assertEquals(Decision.Type.NO, noDecision.getFinalDecisionType());
|
||||||
assertEquals(AllocationStatus.DECIDERS_NO, noDecision.getAllocationStatus());
|
assertEquals(AllocationStatus.DECIDERS_NO, noDecision.getAllocationStatus());
|
||||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||||
assertEquals(nodeDecisions, noDecision.getNodeDecisions());
|
assertEquals(nodeDecisions, noDecision.getNodeDecisions());
|
||||||
|
@ -72,16 +76,18 @@ public class ShardAllocationDecisionTests extends ESTestCase {
|
||||||
assertNull(noDecision.getAllocationId());
|
assertNull(noDecision.getAllocationId());
|
||||||
|
|
||||||
// test bad values
|
// test bad values
|
||||||
expectThrows(NullPointerException.class, () -> ShardAllocationDecision.no(null, "a"));
|
expectThrows(NullPointerException.class, () -> ShardAllocationDecision.no((AllocationStatus)null, "a"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThrottleDecision() {
|
public void testThrottleDecision() {
|
||||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
Map<String, WeightedDecision> nodeDecisions = new HashMap<>();
|
||||||
nodeDecisions.put("node1", Decision.NO);
|
nodeDecisions.put("node1", new ShardAllocationDecision.WeightedDecision(Decision.NO));
|
||||||
nodeDecisions.put("node2", Decision.THROTTLE);
|
nodeDecisions.put("node2", new ShardAllocationDecision.WeightedDecision(Decision.THROTTLE));
|
||||||
ShardAllocationDecision throttleDecision = ShardAllocationDecision.throttle("too much happening", nodeDecisions);
|
ShardAllocationDecision throttleDecision = ShardAllocationDecision.throttle("too much happening",
|
||||||
|
nodeDecisions.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getDecision()))
|
||||||
|
);
|
||||||
assertTrue(throttleDecision.isDecisionTaken());
|
assertTrue(throttleDecision.isDecisionTaken());
|
||||||
assertEquals(Decision.Type.THROTTLE, throttleDecision.getFinalDecision());
|
assertEquals(Decision.Type.THROTTLE, throttleDecision.getFinalDecisionType());
|
||||||
assertEquals(AllocationStatus.DECIDERS_THROTTLED, throttleDecision.getAllocationStatus());
|
assertEquals(AllocationStatus.DECIDERS_THROTTLED, throttleDecision.getAllocationStatus());
|
||||||
assertEquals("too much happening", throttleDecision.getFinalExplanation());
|
assertEquals("too much happening", throttleDecision.getFinalExplanation());
|
||||||
assertEquals(nodeDecisions, throttleDecision.getNodeDecisions());
|
assertEquals(nodeDecisions, throttleDecision.getNodeDecisions());
|
||||||
|
@ -90,15 +96,17 @@ public class ShardAllocationDecisionTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testYesDecision() {
|
public void testYesDecision() {
|
||||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
Map<String, ShardAllocationDecision.WeightedDecision> nodeDecisions = new HashMap<>();
|
||||||
nodeDecisions.put("node1", Decision.YES);
|
nodeDecisions.put("node1", new ShardAllocationDecision.WeightedDecision(Decision.YES));
|
||||||
nodeDecisions.put("node2", Decision.NO);
|
nodeDecisions.put("node2", new ShardAllocationDecision.WeightedDecision(Decision.NO));
|
||||||
String allocId = randomBoolean() ? "allocId" : null;
|
String allocId = randomBoolean() ? "allocId" : null;
|
||||||
ShardAllocationDecision yesDecision = ShardAllocationDecision.yes(
|
ShardAllocationDecision yesDecision = ShardAllocationDecision.yes(
|
||||||
"node1", "node was very kind", allocId, nodeDecisions
|
"node1", "node was very kind", allocId, nodeDecisions.entrySet().stream().collect(
|
||||||
|
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getDecision())
|
||||||
|
)
|
||||||
);
|
);
|
||||||
assertTrue(yesDecision.isDecisionTaken());
|
assertTrue(yesDecision.isDecisionTaken());
|
||||||
assertEquals(Decision.Type.YES, yesDecision.getFinalDecision());
|
assertEquals(Decision.Type.YES, yesDecision.getFinalDecisionType());
|
||||||
assertNull(yesDecision.getAllocationStatus());
|
assertNull(yesDecision.getAllocationStatus());
|
||||||
assertEquals("node was very kind", yesDecision.getFinalExplanation());
|
assertEquals("node was very kind", yesDecision.getFinalExplanation());
|
||||||
assertEquals(nodeDecisions, yesDecision.getNodeDecisions());
|
assertEquals(nodeDecisions, yesDecision.getNodeDecisions());
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.Type.NO;
|
||||||
|
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.Type.THROTTLE;
|
||||||
|
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.Type.YES;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class for unit testing the {@link Decision} class.
|
||||||
|
*/
|
||||||
|
public class DecisionTests extends ESTestCase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests {@link Type#higherThan(Type)}
|
||||||
|
*/
|
||||||
|
public void testHigherThan() {
|
||||||
|
// test YES type
|
||||||
|
assertTrue(YES.higherThan(NO));
|
||||||
|
assertTrue(YES.higherThan(THROTTLE));
|
||||||
|
assertFalse(YES.higherThan(YES));
|
||||||
|
|
||||||
|
// test THROTTLE type
|
||||||
|
assertTrue(THROTTLE.higherThan(NO));
|
||||||
|
assertFalse(THROTTLE.higherThan(THROTTLE));
|
||||||
|
assertFalse(THROTTLE.higherThan(YES));
|
||||||
|
|
||||||
|
// test NO type
|
||||||
|
assertFalse(NO.higherThan(NO));
|
||||||
|
assertFalse(NO.higherThan(THROTTLE));
|
||||||
|
assertFalse(NO.higherThan(YES));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue