Shard Decision class improvements for Explain API (#20742)
This commit improves the shard decision container class in the following ways: 1. Renames UnassignedShardDecision to ShardAllocationDecision, so that the class can be used for general shard decisions, not just unassigned shard decisions. 2. Changes ShardAllocationDecision to have the final decision as a Type instead of a Decision, because all the information needed from the final decision is contained in `Type`. 3. Uses cached instances of ShardAllocationDecision for NO and THROTTLE decisions when no explanation is needed (which is the common case when executing reroute's as opposed to using the explain API).
This commit is contained in:
parent
e83e8e890e
commit
5d38248afa
|
@ -25,19 +25,39 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents the allocation decision by an allocator for an unassigned shard.
|
||||
* Represents the allocation decision by an allocator for a shard.
|
||||
*/
|
||||
public class UnassignedShardDecision {
|
||||
public class ShardAllocationDecision {
|
||||
/** a constant representing a shard decision where no decision was taken */
|
||||
public static final UnassignedShardDecision DECISION_NOT_TAKEN =
|
||||
new UnassignedShardDecision(null, null, null, null, null, null);
|
||||
public static final ShardAllocationDecision DECISION_NOT_TAKEN =
|
||||
new ShardAllocationDecision(null, null, null, null, null, null);
|
||||
/**
|
||||
* a map of cached common no/throttle decisions that don't need explanations,
|
||||
* this helps prevent unnecessary object allocations for the non-explain API case
|
||||
*/
|
||||
private static final Map<AllocationStatus, ShardAllocationDecision> CACHED_DECISIONS;
|
||||
static {
|
||||
Map<AllocationStatus, ShardAllocationDecision> cachedDecisions = new HashMap<>();
|
||||
cachedDecisions.put(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
new ShardAllocationDecision(Type.NO, AllocationStatus.FETCHING_SHARD_DATA, null, null, null, null));
|
||||
cachedDecisions.put(AllocationStatus.NO_VALID_SHARD_COPY,
|
||||
new ShardAllocationDecision(Type.NO, AllocationStatus.NO_VALID_SHARD_COPY, null, null, null, null));
|
||||
cachedDecisions.put(AllocationStatus.DECIDERS_NO,
|
||||
new ShardAllocationDecision(Type.NO, AllocationStatus.DECIDERS_NO, null, null, null, null));
|
||||
cachedDecisions.put(AllocationStatus.DECIDERS_THROTTLED,
|
||||
new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, null, null, null, null));
|
||||
cachedDecisions.put(AllocationStatus.DELAYED_ALLOCATION,
|
||||
new ShardAllocationDecision(Type.NO, AllocationStatus.DELAYED_ALLOCATION, null, null, null, null));
|
||||
CACHED_DECISIONS = Collections.unmodifiableMap(cachedDecisions);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private final Decision finalDecision;
|
||||
private final Type finalDecision;
|
||||
@Nullable
|
||||
private final AllocationStatus allocationStatus;
|
||||
@Nullable
|
||||
|
@ -49,17 +69,15 @@ public class UnassignedShardDecision {
|
|||
@Nullable
|
||||
private final Map<String, Decision> nodeDecisions;
|
||||
|
||||
private UnassignedShardDecision(Decision finalDecision,
|
||||
private ShardAllocationDecision(Type finalDecision,
|
||||
AllocationStatus allocationStatus,
|
||||
String finalExplanation,
|
||||
String assignedNodeId,
|
||||
String allocationId,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
assert finalExplanation != null || finalDecision == null :
|
||||
"if a decision was taken, there must be an explanation for it";
|
||||
assert assignedNodeId != null || finalDecision == null || finalDecision.type() != Type.YES :
|
||||
assert assignedNodeId != null || finalDecision == null || finalDecision != Type.YES :
|
||||
"a yes decision must have a node to assign the shard to";
|
||||
assert allocationStatus != null || finalDecision == null || finalDecision.type() == Type.YES :
|
||||
assert allocationStatus != null || finalDecision == null || finalDecision == Type.YES :
|
||||
"only a yes decision should not have an allocation status";
|
||||
assert allocationId == null || assignedNodeId != null :
|
||||
"allocation id can only be null if the assigned node is null";
|
||||
|
@ -72,33 +90,36 @@ public class UnassignedShardDecision {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a NO decision with the given {@link AllocationStatus} and explanation for the NO decision.
|
||||
* Returns a NO decision with the given {@link AllocationStatus} and explanation for the NO decision, if in explain mode.
|
||||
*/
|
||||
public static UnassignedShardDecision noDecision(AllocationStatus allocationStatus, String explanation) {
|
||||
return noDecision(allocationStatus, explanation, null);
|
||||
public static ShardAllocationDecision no(AllocationStatus allocationStatus, @Nullable String explanation) {
|
||||
return no(allocationStatus, explanation, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a NO decision with the given {@link AllocationStatus} and explanation for the NO decision,
|
||||
* as well as the individual node-level decisions that comprised the final NO decision.
|
||||
* Returns a NO decision with the given {@link AllocationStatus}, and the explanation for the NO decision
|
||||
* as well as the individual node-level decisions that comprised the final NO decision if in explain mode.
|
||||
*/
|
||||
public static UnassignedShardDecision noDecision(AllocationStatus allocationStatus,
|
||||
String explanation,
|
||||
@Nullable Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
public static ShardAllocationDecision no(AllocationStatus allocationStatus, @Nullable String explanation,
|
||||
@Nullable Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(allocationStatus, "allocationStatus must not be null");
|
||||
return new UnassignedShardDecision(Decision.NO, allocationStatus, explanation, null, null, nodeDecisions);
|
||||
if (explanation != null) {
|
||||
return new ShardAllocationDecision(Type.NO, allocationStatus, explanation, null, null, nodeDecisions);
|
||||
} else {
|
||||
return getCachedDecision(allocationStatus);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a THROTTLE decision with the given explanation and individual node-level decisions that
|
||||
* comprised the final THROTTLE decision.
|
||||
* Returns a THROTTLE decision, with the given explanation and individual node-level decisions that
|
||||
* comprised the final THROTTLE decision if in explain mode.
|
||||
*/
|
||||
public static UnassignedShardDecision throttleDecision(String explanation,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
return new UnassignedShardDecision(Decision.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, explanation, null, null,
|
||||
nodeDecisions);
|
||||
public static ShardAllocationDecision throttle(@Nullable String explanation, @Nullable Map<String, Decision> nodeDecisions) {
|
||||
if (explanation != null) {
|
||||
return new ShardAllocationDecision(Type.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, explanation, null, null, nodeDecisions);
|
||||
} else {
|
||||
return getCachedDecision(AllocationStatus.DECIDERS_THROTTLED);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -106,13 +127,15 @@ public class UnassignedShardDecision {
|
|||
* comprised the final YES decision, along with the node id to which the shard is assigned and
|
||||
* the allocation id for the shard, if available.
|
||||
*/
|
||||
public static UnassignedShardDecision yesDecision(String explanation,
|
||||
String assignedNodeId,
|
||||
@Nullable String allocationId,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
public static ShardAllocationDecision yes(String assignedNodeId, @Nullable String explanation, @Nullable String allocationId,
|
||||
@Nullable Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(assignedNodeId, "assignedNodeId must not be null");
|
||||
return new UnassignedShardDecision(Decision.YES, null, explanation, assignedNodeId, allocationId, nodeDecisions);
|
||||
return new ShardAllocationDecision(Type.YES, null, explanation, assignedNodeId, allocationId, nodeDecisions);
|
||||
}
|
||||
|
||||
private static ShardAllocationDecision getCachedDecision(AllocationStatus allocationStatus) {
|
||||
ShardAllocationDecision decision = CACHED_DECISIONS.get(allocationStatus);
|
||||
return Objects.requireNonNull(decision, "precomputed decision not found for " + allocationStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,20 +147,20 @@ public class UnassignedShardDecision {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the final decision made by the allocator on whether to assign the unassigned shard.
|
||||
* Returns the final decision made by the allocator on whether to assign the shard.
|
||||
* This value can only be {@code null} if {@link #isDecisionTaken()} returns {@code false}.
|
||||
*/
|
||||
@Nullable
|
||||
public Decision getFinalDecision() {
|
||||
public Type getFinalDecision() {
|
||||
return finalDecision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the final decision made by the allocator on whether to assign the unassigned shard.
|
||||
* Returns the final decision made by the allocator on whether to assign the shard.
|
||||
* Only call this method if {@link #isDecisionTaken()} returns {@code true}, otherwise it will
|
||||
* throw an {@code IllegalArgumentException}.
|
||||
*/
|
||||
public Decision getFinalDecisionSafe() {
|
||||
public Type getFinalDecisionSafe() {
|
||||
if (isDecisionTaken() == false) {
|
||||
throw new IllegalArgumentException("decision must have been taken in order to return the final decision");
|
||||
}
|
||||
|
@ -161,18 +184,6 @@ public class UnassignedShardDecision {
|
|||
return finalExplanation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecision()}.
|
||||
* Only call this method if {@link #isDecisionTaken()} returns {@code true}, otherwise it will
|
||||
* throw an {@code IllegalArgumentException}.
|
||||
*/
|
||||
public String getFinalExplanationSafe() {
|
||||
if (isDecisionTaken() == false) {
|
||||
throw new IllegalArgumentException("decision must have been taken in order to return the final explanation");
|
||||
}
|
||||
return finalExplanation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the node id that the allocator will assign the shard to, unless {@link #getFinalDecision()} returns
|
||||
* a value other than {@link Decision.Type#YES}, in which case this returns {@code null}.
|
|
@ -23,7 +23,7 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -53,21 +53,21 @@ public abstract class BaseGatewayShardAllocator extends AbstractComponent {
|
|||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
final ShardRouting shard = unassignedIterator.next();
|
||||
final UnassignedShardDecision unassignedShardDecision = makeAllocationDecision(shard, allocation, logger);
|
||||
final ShardAllocationDecision shardAllocationDecision = makeAllocationDecision(shard, allocation, logger);
|
||||
|
||||
if (unassignedShardDecision.isDecisionTaken() == false) {
|
||||
if (shardAllocationDecision.isDecisionTaken() == false) {
|
||||
// no decision was taken by this allocator
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unassignedShardDecision.getFinalDecisionSafe().type() == Decision.Type.YES) {
|
||||
unassignedIterator.initialize(unassignedShardDecision.getAssignedNodeId(),
|
||||
unassignedShardDecision.getAllocationId(),
|
||||
if (shardAllocationDecision.getFinalDecisionSafe() == Decision.Type.YES) {
|
||||
unassignedIterator.initialize(shardAllocationDecision.getAssignedNodeId(),
|
||||
shardAllocationDecision.getAllocationId(),
|
||||
shard.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
|
||||
allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
|
||||
allocation.changes());
|
||||
} else {
|
||||
unassignedIterator.removeAndIgnore(unassignedShardDecision.getAllocationStatus(), allocation.changes());
|
||||
unassignedIterator.removeAndIgnore(shardAllocationDecision.getAllocationStatus(), allocation.changes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -80,9 +80,9 @@ public abstract class BaseGatewayShardAllocator extends AbstractComponent {
|
|||
* @param unassignedShard the unassigned shard to allocate
|
||||
* @param allocation the current routing state
|
||||
* @param logger the logger
|
||||
* @return an {@link UnassignedShardDecision} with the final decision of whether to allocate and details of the decision
|
||||
* @return an {@link ShardAllocationDecision} with the final decision of whether to allocate and details of the decision
|
||||
*/
|
||||
public abstract UnassignedShardDecision makeAllocationDecision(ShardRouting unassignedShard,
|
||||
public abstract ShardAllocationDecision makeAllocationDecision(ShardRouting unassignedShard,
|
||||
RoutingAllocation allocation,
|
||||
Logger logger);
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -110,20 +110,20 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public UnassignedShardDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
public ShardAllocationDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
final RoutingAllocation allocation,
|
||||
final Logger logger) {
|
||||
if (isResponsibleFor(unassignedShard) == false) {
|
||||
// this allocator is not responsible for allocating this shard
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
final boolean explain = allocation.debugDecision();
|
||||
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
|
||||
if (shardState.hasData() == false) {
|
||||
allocation.setHasPendingAsyncFetch();
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
"still fetching shard state from the nodes in the cluster");
|
||||
return ShardAllocationDecision.no(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
explain ? "still fetching shard state from the nodes in the cluster" : null);
|
||||
}
|
||||
|
||||
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
|
||||
|
@ -167,19 +167,19 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
} else if (recoverOnAnyNode) {
|
||||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, recover from any node", unassignedShard.index(), unassignedShard.id());
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
} else {
|
||||
// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
|
||||
// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
|
||||
// this shard will be picked up when the node joins and we do another allocation reroute
|
||||
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.NO_VALID_SHARD_COPY,
|
||||
"shard was previously allocated, but no valid shard copy could be found amongst the current nodes in the cluster");
|
||||
return ShardAllocationDecision.no(AllocationStatus.NO_VALID_SHARD_COPY,
|
||||
explain ? "shard was previously allocated, but no valid shard copy could be found amongst the nodes in the cluster" : null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,9 +191,10 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());
|
||||
final String nodeId = decidedNode.nodeShardState.getNode().getId();
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
return ShardAllocationDecision.yes(nodeId,
|
||||
"the allocation deciders returned a YES decision to allocate to node [" + nodeId + "]",
|
||||
nodeId, decidedNode.nodeShardState.allocationId(), buildNodeDecisions(nodesToAllocate, explain));
|
||||
decidedNode.nodeShardState.allocationId(),
|
||||
buildNodeDecisions(nodesToAllocate, explain));
|
||||
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
|
||||
// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
|
||||
// can be force-allocated to one of the nodes.
|
||||
|
@ -206,22 +207,21 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());
|
||||
final String nodeId = nodeShardState.getNode().getId();
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
return ShardAllocationDecision.yes(nodeId,
|
||||
"allocating the primary shard to node [" + nodeId+ "], which has a complete copy of the shard data",
|
||||
nodeId,
|
||||
nodeShardState.allocationId(),
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
} else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToForceAllocate.throttleNodeShards);
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"allocation throttled as all nodes to which the shard may be force allocated are busy with other recoveries",
|
||||
return ShardAllocationDecision.throttle(
|
||||
explain ? "allocation throttled as all nodes to which the shard may be force allocated are busy with other recoveries" : null,
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
} else {
|
||||
logger.debug("[{}][{}]: forced primary allocation denied [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO,
|
||||
"all nodes that hold a valid shard copy returned a NO decision, and force allocation is not permitted",
|
||||
return ShardAllocationDecision.no(AllocationStatus.DECIDERS_NO,
|
||||
explain ? "all nodes that hold a valid shard copy returned a NO decision, and force allocation is not permitted" : null,
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
}
|
||||
} else {
|
||||
|
@ -229,8 +229,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
// taking place on the node currently, ignore it for now
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"allocation throttled as all nodes to which the shard may be allocated are busy with other recoveries",
|
||||
return ShardAllocationDecision.throttle(
|
||||
explain ? "allocation throttled as all nodes to which the shard may be allocated are busy with other recoveries" : null,
|
||||
buildNodeDecisions(nodesToAllocate, explain));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -81,7 +82,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
continue;
|
||||
}
|
||||
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
|
||||
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
|
||||
if (shardStores.hasData() == false) {
|
||||
logger.trace("{}: fetching new stores for initializing shard", shard);
|
||||
continue; // still fetching
|
||||
|
@ -140,12 +141,12 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public UnassignedShardDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
public ShardAllocationDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
final RoutingAllocation allocation,
|
||||
final Logger logger) {
|
||||
if (isResponsibleFor(unassignedShard) == false) {
|
||||
// this allocator is not responsible for deciding on this shard
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
|
@ -154,17 +155,17 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
Tuple<Decision, Map<String, Decision>> allocateDecision = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation, explain);
|
||||
if (allocateDecision.v1().type() != Decision.Type.YES) {
|
||||
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.v1()),
|
||||
"all nodes returned a " + allocateDecision.v1().type() + " decision for allocating the replica shard",
|
||||
return ShardAllocationDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.v1()),
|
||||
explain ? "all nodes returned a " + allocateDecision.v1().type() + " decision for allocating the replica shard" : null,
|
||||
allocateDecision.v2());
|
||||
}
|
||||
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(unassignedShard, allocation);
|
||||
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores = fetchData(unassignedShard, allocation);
|
||||
if (shardStores.hasData() == false) {
|
||||
logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard);
|
||||
allocation.setHasPendingAsyncFetch();
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
"still fetching shard state from the nodes in the cluster");
|
||||
return ShardAllocationDecision.no(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
explain ? "still fetching shard state from the nodes in the cluster" : null);
|
||||
}
|
||||
|
||||
ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId());
|
||||
|
@ -176,7 +177,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
// will try and recover from
|
||||
// Note, this is the existing behavior, as exposed in running CorruptFileTest#testNoPrimaryData
|
||||
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", unassignedShard);
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain);
|
||||
|
@ -190,27 +191,28 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
|
||||
// we are throttling this, as we have enough other shards to allocate to this node, so ignore it for now
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"returned a THROTTLE decision on each node that has an existing copy of the shard, so waiting to re-use one " +
|
||||
"of those copies", matchingNodes.nodeDecisions);
|
||||
return ShardAllocationDecision.throttle(
|
||||
explain ? "returned a THROTTLE decision on each node that has an existing copy of the shard, so waiting to re-use one of those copies" : null,
|
||||
matchingNodes.nodeDecisions);
|
||||
} else {
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
|
||||
// we found a match
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
return ShardAllocationDecision.yes(nodeWithHighestMatch.nodeId(),
|
||||
"allocating to node [" + nodeWithHighestMatch.nodeId() + "] in order to re-use its unallocated persistent store",
|
||||
nodeWithHighestMatch.nodeId(), null, matchingNodes.nodeDecisions);
|
||||
null,
|
||||
matchingNodes.nodeDecisions);
|
||||
}
|
||||
} else if (matchingNodes.hasAnyData() == false && unassignedShard.unassignedInfo().isDelayed()) {
|
||||
// if we didn't manage to find *any* data (regardless of matching sizes), and the replica is
|
||||
// unassigned due to a node leaving, so we delay allocation of this replica to see if the
|
||||
// node with the shard copy will rejoin so we can re-use the copy it has
|
||||
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.DELAYED_ALLOCATION,
|
||||
"not allocating this shard, no nodes contain data for the replica and allocation is delayed");
|
||||
return ShardAllocationDecision.no(AllocationStatus.DELAYED_ALLOCATION,
|
||||
explain ? "not allocating this shard, no nodes contain data for the replica and allocation is delayed" : null);
|
||||
}
|
||||
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
return ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,13 +252,13 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
/**
|
||||
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
|
||||
*/
|
||||
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation, AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data) {
|
||||
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation, AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
|
||||
assert shard.currentNodeId() != null;
|
||||
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
|
||||
if (primaryNode == null) {
|
||||
return null;
|
||||
}
|
||||
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode);
|
||||
NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode);
|
||||
if (primaryNodeFilesStore == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -265,11 +267,11 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
|
||||
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
|
||||
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data,
|
||||
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
|
||||
boolean explain) {
|
||||
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
|
||||
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
|
||||
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
||||
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
|
||||
// we don't have any files at all, it is an empty index
|
||||
|
@ -317,7 +319,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
return new MatchingNodes(nodesToSize, explain ? nodeDecisions : null);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
|
||||
static class MatchingNodes {
|
||||
private final ObjectLongMap<DiscoveryNode> nodesToSize;
|
||||
|
|
|
@ -23,35 +23,36 @@ import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Unit tests for the {@link UnassignedShardDecision} class.
|
||||
* Unit tests for the {@link ShardAllocationDecision} class.
|
||||
*/
|
||||
public class UnassignedShardDecisionTests extends ESTestCase {
|
||||
public class ShardAllocationDecisionTests extends ESTestCase {
|
||||
|
||||
public void testDecisionNotTaken() {
|
||||
UnassignedShardDecision unassignedShardDecision = UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
assertFalse(unassignedShardDecision.isDecisionTaken());
|
||||
assertNull(unassignedShardDecision.getFinalDecision());
|
||||
assertNull(unassignedShardDecision.getAllocationStatus());
|
||||
assertNull(unassignedShardDecision.getAllocationId());
|
||||
assertNull(unassignedShardDecision.getAssignedNodeId());
|
||||
assertNull(unassignedShardDecision.getFinalExplanation());
|
||||
assertNull(unassignedShardDecision.getNodeDecisions());
|
||||
expectThrows(IllegalArgumentException.class, () -> unassignedShardDecision.getFinalDecisionSafe());
|
||||
expectThrows(IllegalArgumentException.class, () -> unassignedShardDecision.getFinalExplanationSafe());
|
||||
ShardAllocationDecision shardAllocationDecision = ShardAllocationDecision.DECISION_NOT_TAKEN;
|
||||
assertFalse(shardAllocationDecision.isDecisionTaken());
|
||||
assertNull(shardAllocationDecision.getFinalDecision());
|
||||
assertNull(shardAllocationDecision.getAllocationStatus());
|
||||
assertNull(shardAllocationDecision.getAllocationId());
|
||||
assertNull(shardAllocationDecision.getAssignedNodeId());
|
||||
assertNull(shardAllocationDecision.getFinalExplanation());
|
||||
assertNull(shardAllocationDecision.getNodeDecisions());
|
||||
expectThrows(IllegalArgumentException.class, () -> shardAllocationDecision.getFinalDecisionSafe());
|
||||
}
|
||||
|
||||
public void testNoDecision() {
|
||||
final AllocationStatus allocationStatus = randomFrom(
|
||||
AllocationStatus.DELAYED_ALLOCATION, AllocationStatus.NO_VALID_SHARD_COPY, AllocationStatus.FETCHING_SHARD_DATA
|
||||
);
|
||||
UnassignedShardDecision noDecision = UnassignedShardDecision.noDecision(allocationStatus, "something is wrong");
|
||||
ShardAllocationDecision noDecision = ShardAllocationDecision.no(allocationStatus, "something is wrong");
|
||||
assertTrue(noDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision().type());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision());
|
||||
assertEquals(allocationStatus, noDecision.getAllocationStatus());
|
||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||
assertNull(noDecision.getNodeDecisions());
|
||||
|
@ -61,9 +62,9 @@ public class UnassignedShardDecisionTests extends ESTestCase {
|
|||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
nodeDecisions.put("node1", Decision.NO);
|
||||
nodeDecisions.put("node2", Decision.NO);
|
||||
noDecision = UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO, "something is wrong", nodeDecisions);
|
||||
noDecision = ShardAllocationDecision.no(AllocationStatus.DECIDERS_NO, "something is wrong", nodeDecisions);
|
||||
assertTrue(noDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision().type());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision());
|
||||
assertEquals(AllocationStatus.DECIDERS_NO, noDecision.getAllocationStatus());
|
||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, noDecision.getNodeDecisions());
|
||||
|
@ -71,25 +72,21 @@ public class UnassignedShardDecisionTests extends ESTestCase {
|
|||
assertNull(noDecision.getAllocationId());
|
||||
|
||||
// test bad values
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.noDecision(null, "a"));
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO, null));
|
||||
expectThrows(NullPointerException.class, () -> ShardAllocationDecision.no(null, "a"));
|
||||
}
|
||||
|
||||
public void testThrottleDecision() {
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
nodeDecisions.put("node1", Decision.NO);
|
||||
nodeDecisions.put("node2", Decision.THROTTLE);
|
||||
UnassignedShardDecision throttleDecision = UnassignedShardDecision.throttleDecision("too much happening", nodeDecisions);
|
||||
ShardAllocationDecision throttleDecision = ShardAllocationDecision.throttle("too much happening", nodeDecisions);
|
||||
assertTrue(throttleDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.THROTTLE, throttleDecision.getFinalDecision().type());
|
||||
assertEquals(Decision.Type.THROTTLE, throttleDecision.getFinalDecision());
|
||||
assertEquals(AllocationStatus.DECIDERS_THROTTLED, throttleDecision.getAllocationStatus());
|
||||
assertEquals("too much happening", throttleDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, throttleDecision.getNodeDecisions());
|
||||
assertNull(throttleDecision.getAssignedNodeId());
|
||||
assertNull(throttleDecision.getAllocationId());
|
||||
|
||||
// test bad values
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.throttleDecision(null, Collections.emptyMap()));
|
||||
}
|
||||
|
||||
public void testYesDecision() {
|
||||
|
@ -97,20 +94,45 @@ public class UnassignedShardDecisionTests extends ESTestCase {
|
|||
nodeDecisions.put("node1", Decision.YES);
|
||||
nodeDecisions.put("node2", Decision.NO);
|
||||
String allocId = randomBoolean() ? "allocId" : null;
|
||||
UnassignedShardDecision yesDecision = UnassignedShardDecision.yesDecision(
|
||||
"node was very kind", "node1", allocId, nodeDecisions
|
||||
ShardAllocationDecision yesDecision = ShardAllocationDecision.yes(
|
||||
"node1", "node was very kind", allocId, nodeDecisions
|
||||
);
|
||||
assertTrue(yesDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.YES, yesDecision.getFinalDecision().type());
|
||||
assertEquals(Decision.Type.YES, yesDecision.getFinalDecision());
|
||||
assertNull(yesDecision.getAllocationStatus());
|
||||
assertEquals("node was very kind", yesDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, yesDecision.getNodeDecisions());
|
||||
assertEquals("node1", yesDecision.getAssignedNodeId());
|
||||
assertEquals(allocId, yesDecision.getAllocationId());
|
||||
|
||||
expectThrows(NullPointerException.class,
|
||||
() -> UnassignedShardDecision.yesDecision(null, "a", randomBoolean() ? "a" : null, Collections.emptyMap()));
|
||||
expectThrows(NullPointerException.class,
|
||||
() -> UnassignedShardDecision.yesDecision("a", null, null, Collections.emptyMap()));
|
||||
}
|
||||
|
||||
public void testCachedDecisions() {
|
||||
List<AllocationStatus> cachableStatuses = Arrays.asList(AllocationStatus.DECIDERS_NO, AllocationStatus.DECIDERS_THROTTLED,
|
||||
AllocationStatus.NO_VALID_SHARD_COPY, AllocationStatus.FETCHING_SHARD_DATA, AllocationStatus.DELAYED_ALLOCATION);
|
||||
for (AllocationStatus allocationStatus : cachableStatuses) {
|
||||
if (allocationStatus == AllocationStatus.DECIDERS_THROTTLED) {
|
||||
ShardAllocationDecision cached = ShardAllocationDecision.throttle(null, null);
|
||||
ShardAllocationDecision another = ShardAllocationDecision.throttle(null, null);
|
||||
assertSame(cached, another);
|
||||
ShardAllocationDecision notCached = ShardAllocationDecision.throttle("abc", null);
|
||||
another = ShardAllocationDecision.throttle("abc", null);
|
||||
assertNotSame(notCached, another);
|
||||
} else {
|
||||
ShardAllocationDecision cached = ShardAllocationDecision.no(allocationStatus, null);
|
||||
ShardAllocationDecision another = ShardAllocationDecision.no(allocationStatus, null);
|
||||
assertSame(cached, another);
|
||||
ShardAllocationDecision notCached = ShardAllocationDecision.no(allocationStatus, "abc");
|
||||
another = ShardAllocationDecision.no(allocationStatus, "abc");
|
||||
assertNotSame(notCached, another);
|
||||
}
|
||||
}
|
||||
|
||||
// yes decisions are not precomputed and cached
|
||||
Map<String, Decision> dummyMap = Collections.emptyMap();
|
||||
ShardAllocationDecision first = ShardAllocationDecision.yes("node1", "abc", "alloc1", dummyMap);
|
||||
ShardAllocationDecision second = ShardAllocationDecision.yes("node1", "abc", "alloc1", dummyMap);
|
||||
// same fields for the ShardAllocationDecision, but should be different instances
|
||||
assertNotSame(first, second);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue