mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 05:15:04 +00:00
Split allocator decision making from decision application (#20347)
Splits the PrimaryShardAllocator and ReplicaShardAllocator's decision making for a shard from the implementation of that decision on the routing table. This is a step toward making it easier to use the same logic for the cluster allocation explain APIs.
This commit is contained in:
parent
69bf08f6c6
commit
b1e87aa13c
@ -0,0 +1,205 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents the allocation decision by an allocator for an unassigned shard.
|
||||
*/
|
||||
public class UnassignedShardDecision {
|
||||
/** a constant representing a shard decision where no decision was taken */
|
||||
public static final UnassignedShardDecision DECISION_NOT_TAKEN =
|
||||
new UnassignedShardDecision(null, null, null, null, null, null);
|
||||
|
||||
@Nullable
|
||||
private final Decision finalDecision;
|
||||
@Nullable
|
||||
private final AllocationStatus allocationStatus;
|
||||
@Nullable
|
||||
private final String finalExplanation;
|
||||
@Nullable
|
||||
private final String assignedNodeId;
|
||||
@Nullable
|
||||
private final String allocationId;
|
||||
@Nullable
|
||||
private final Map<String, Decision> nodeDecisions;
|
||||
|
||||
private UnassignedShardDecision(Decision finalDecision,
|
||||
AllocationStatus allocationStatus,
|
||||
String finalExplanation,
|
||||
String assignedNodeId,
|
||||
String allocationId,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
assert finalExplanation != null || finalDecision == null :
|
||||
"if a decision was taken, there must be an explanation for it";
|
||||
assert assignedNodeId != null || finalDecision == null || finalDecision.type() != Type.YES :
|
||||
"a yes decision must have a node to assign the shard to";
|
||||
assert allocationStatus != null || finalDecision == null || finalDecision.type() == Type.YES :
|
||||
"only a yes decision should not have an allocation status";
|
||||
assert allocationId == null || assignedNodeId != null :
|
||||
"allocation id can only be null if the assigned node is null";
|
||||
this.finalDecision = finalDecision;
|
||||
this.allocationStatus = allocationStatus;
|
||||
this.finalExplanation = finalExplanation;
|
||||
this.assignedNodeId = assignedNodeId;
|
||||
this.allocationId = allocationId;
|
||||
this.nodeDecisions = nodeDecisions != null ? Collections.unmodifiableMap(nodeDecisions) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a NO decision with the given {@link AllocationStatus} and explanation for the NO decision.
|
||||
*/
|
||||
public static UnassignedShardDecision noDecision(AllocationStatus allocationStatus, String explanation) {
|
||||
return noDecision(allocationStatus, explanation, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a NO decision with the given {@link AllocationStatus} and explanation for the NO decision,
|
||||
* as well as the individual node-level decisions that comprised the final NO decision.
|
||||
*/
|
||||
public static UnassignedShardDecision noDecision(AllocationStatus allocationStatus,
|
||||
String explanation,
|
||||
@Nullable Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
Objects.requireNonNull(allocationStatus, "allocationStatus must not be null");
|
||||
return new UnassignedShardDecision(Decision.NO, allocationStatus, explanation, null, null, nodeDecisions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a THROTTLE decision with the given explanation and individual node-level decisions that
|
||||
* comprised the final THROTTLE decision.
|
||||
*/
|
||||
public static UnassignedShardDecision throttleDecision(String explanation,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
return new UnassignedShardDecision(Decision.THROTTLE, AllocationStatus.DECIDERS_THROTTLED, explanation, null, null,
|
||||
nodeDecisions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a YES decision with the given explanation and individual node-level decisions that
|
||||
* comprised the final YES decision, along with the node id to which the shard is assigned and
|
||||
* the allocation id for the shard, if available.
|
||||
*/
|
||||
public static UnassignedShardDecision yesDecision(String explanation,
|
||||
String assignedNodeId,
|
||||
@Nullable String allocationId,
|
||||
Map<String, Decision> nodeDecisions) {
|
||||
Objects.requireNonNull(explanation, "explanation must not be null");
|
||||
Objects.requireNonNull(assignedNodeId, "assignedNodeId must not be null");
|
||||
return new UnassignedShardDecision(Decision.YES, null, explanation, assignedNodeId, allocationId, nodeDecisions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if a decision was taken by the allocator, {@code false} otherwise.
|
||||
* If no decision was taken, then the rest of the fields in this object are meaningless and return {@code null}.
|
||||
*/
|
||||
public boolean isDecisionTaken() {
|
||||
return finalDecision != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the final decision made by the allocator on whether to assign the unassigned shard.
|
||||
* This value can only be {@code null} if {@link #isDecisionTaken()} returns {@code false}.
|
||||
*/
|
||||
@Nullable
|
||||
public Decision getFinalDecision() {
|
||||
return finalDecision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the final decision made by the allocator on whether to assign the unassigned shard.
|
||||
* Only call this method if {@link #isDecisionTaken()} returns {@code true}, otherwise it will
|
||||
* throw an {@code IllegalArgumentException}.
|
||||
*/
|
||||
public Decision getFinalDecisionSafe() {
|
||||
if (isDecisionTaken() == false) {
|
||||
throw new IllegalArgumentException("decision must have been taken in order to return the final decision");
|
||||
}
|
||||
return finalDecision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the status of an unsuccessful allocation attempt. This value will be {@code null} if
|
||||
* no decision was taken or if the decision was {@link Decision.Type#YES}.
|
||||
*/
|
||||
@Nullable
|
||||
public AllocationStatus getAllocationStatus() {
|
||||
return allocationStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecision()}.
|
||||
*/
|
||||
@Nullable
|
||||
public String getFinalExplanation() {
|
||||
return finalExplanation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the free-text explanation for the reason behind the decision taken in {@link #getFinalDecision()}.
|
||||
* Only call this method if {@link #isDecisionTaken()} returns {@code true}, otherwise it will
|
||||
* throw an {@code IllegalArgumentException}.
|
||||
*/
|
||||
public String getFinalExplanationSafe() {
|
||||
if (isDecisionTaken() == false) {
|
||||
throw new IllegalArgumentException("decision must have been taken in order to return the final explanation");
|
||||
}
|
||||
return finalExplanation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the node id that the allocator will assign the shard to, unless {@link #getFinalDecision()} returns
|
||||
* a value other than {@link Decision.Type#YES}, in which case this returns {@code null}.
|
||||
*/
|
||||
@Nullable
|
||||
public String getAssignedNodeId() {
|
||||
return assignedNodeId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the allocation id for the existing shard copy that the allocator is assigning the shard to.
|
||||
* This method returns a non-null value iff {@link #getAssignedNodeId()} returns a non-null value
|
||||
* and the node on which the shard is assigned already has a shard copy with an in-sync allocation id
|
||||
* that we can re-use.
|
||||
*/
|
||||
@Nullable
|
||||
public String getAllocationId() {
|
||||
return allocationId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the individual node-level decisions that went into making the final decision as represented by
|
||||
* {@link #getFinalDecision()}. The map that is returned has the node id as the key and a {@link Decision}
|
||||
* as the decision for the given node.
|
||||
*/
|
||||
@Nullable
|
||||
public Map<String, Decision> getNodeDecisions() {
|
||||
return nodeDecisions;
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
* An abstract class that implements basic functionality for allocating
|
||||
* shards to nodes based on shard copies that already exist in the cluster.
|
||||
*
|
||||
* Individual implementations of this class are responsible for providing
|
||||
* the logic to determine to which nodes (if any) those shards are allocated.
|
||||
*/
|
||||
public abstract class BaseGatewayShardAllocator extends AbstractComponent {
|
||||
|
||||
public BaseGatewayShardAllocator(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate unassigned shards to nodes (if any) where valid copies of the shard already exist.
|
||||
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
|
||||
* to make decisions on assigning shards to nodes.
|
||||
*
|
||||
* @param allocation the allocation state container object
|
||||
*/
|
||||
public void allocateUnassigned(RoutingAllocation allocation) {
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
final ShardRouting shard = unassignedIterator.next();
|
||||
final UnassignedShardDecision unassignedShardDecision = makeAllocationDecision(shard, allocation, logger);
|
||||
|
||||
if (unassignedShardDecision.isDecisionTaken() == false) {
|
||||
// no decision was taken by this allocator
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unassignedShardDecision.getFinalDecisionSafe().type() == Decision.Type.YES) {
|
||||
unassignedIterator.initialize(unassignedShardDecision.getAssignedNodeId(),
|
||||
unassignedShardDecision.getAllocationId(),
|
||||
shard.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
|
||||
allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
|
||||
allocation.changes());
|
||||
} else {
|
||||
unassignedIterator.removeAndIgnore(unassignedShardDecision.getAllocationStatus(), allocation.changes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a decision on the allocation of an unassigned shard. This method is used by
|
||||
* {@link #allocateUnassigned(RoutingAllocation)} to make decisions about whether or not
|
||||
* the shard can be allocated by this allocator and if so, to which node it will be allocated.
|
||||
*
|
||||
* @param unassignedShard the unassigned shard to allocate
|
||||
* @param allocation the current routing state
|
||||
* @param logger the logger
|
||||
* @return an {@link UnassignedShardDecision} with the final decision of whether to allocate and details of the decision
|
||||
*/
|
||||
public abstract UnassignedShardDecision makeAllocationDecision(ShardRouting unassignedShard,
|
||||
RoutingAllocation allocation,
|
||||
Logger logger);
|
||||
}
|
@ -19,12 +19,12 @@
|
||||
|
||||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
@ -32,19 +32,23 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.AsyncShardFetch.FetchResult;
|
||||
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
|
||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@ -62,7 +66,7 @@ import java.util.stream.Collectors;
|
||||
* nor does it allocate primaries when a primary shard failed and there is a valid replica
|
||||
* copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}.
|
||||
*/
|
||||
public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
||||
|
||||
private static final Function<String, String> INITIAL_SHARDS_PARSER = (value) -> {
|
||||
switch (value) {
|
||||
@ -94,110 +98,161 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
logger.debug("using initial_shards [{}]", NODE_INITIAL_SHARDS_SETTING.get(settings));
|
||||
}
|
||||
|
||||
public void allocateUnassigned(RoutingAllocation allocation) {
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final MetaData metaData = allocation.metaData();
|
||||
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
final ShardRouting shard = unassignedIterator.next();
|
||||
|
||||
if (shard.primary() == false) {
|
||||
continue;
|
||||
/**
|
||||
* Is the allocator responsible for allocating the given {@link ShardRouting}?
|
||||
*/
|
||||
private static boolean isResponsibleFor(final ShardRouting shard) {
|
||||
return shard.primary() // must be primary
|
||||
&& shard.unassigned() // must be unassigned
|
||||
// only handle either an existing store or a snapshot recovery
|
||||
&& (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
|
||||
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
|
||||
}
|
||||
|
||||
if (shard.recoverySource().getType() != RecoverySource.Type.EXISTING_STORE &&
|
||||
shard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
|
||||
continue;
|
||||
@Override
|
||||
public UnassignedShardDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
final RoutingAllocation allocation,
|
||||
final Logger logger) {
|
||||
if (isResponsibleFor(unassignedShard) == false) {
|
||||
// this allocator is not responsible for allocating this shard
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
final AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
|
||||
final boolean explain = allocation.debugDecision();
|
||||
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
|
||||
if (shardState.hasData() == false) {
|
||||
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
|
||||
allocation.setHasPendingAsyncFetch();
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes());
|
||||
continue;
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
"still fetching shard state from the nodes in the cluster");
|
||||
}
|
||||
|
||||
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
|
||||
// on cluster restart if we allocate a boat load of shards
|
||||
final IndexMetaData indexMetaData = metaData.getIndexSafe(shard.index());
|
||||
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(shard.id());
|
||||
final boolean snapshotRestore = shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;
|
||||
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(unassignedShard.index());
|
||||
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(unassignedShard.id());
|
||||
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
|
||||
|
||||
final NodeShardsResult nodeShardsResult;
|
||||
final boolean enoughAllocationsFound;
|
||||
|
||||
if (inSyncAllocationIds.isEmpty()) {
|
||||
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_5_0_0_alpha1) :
|
||||
"trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
|
||||
// fall back to old version-based allocation mode
|
||||
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
|
||||
nodeShardsResult = buildVersionBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
|
||||
nodeShardsResult = buildVersionBasedNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
|
||||
allocation.getIgnoreNodes(unassignedShard.shardId()), shardState, logger);
|
||||
if (snapshotRestore || recoverOnAnyNode) {
|
||||
enoughAllocationsFound = nodeShardsResult.allocationsFound > 0;
|
||||
} else {
|
||||
enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(indexMetaData, nodeShardsResult);
|
||||
}
|
||||
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", shard.index(), shard.id(), Version.V_5_0_0_alpha1, nodeShardsResult.allocationsFound, shard);
|
||||
logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}", unassignedShard.index(),
|
||||
unassignedShard.id(), Version.V_5_0_0_alpha1, nodeShardsResult.allocationsFound, unassignedShard);
|
||||
} else {
|
||||
assert inSyncAllocationIds.isEmpty() == false;
|
||||
// use allocation ids to select nodes
|
||||
nodeShardsResult = buildAllocationIdBasedNodeShardsResult(shard, snapshotRestore || recoverOnAnyNode,
|
||||
allocation.getIgnoreNodes(shard.shardId()), inSyncAllocationIds, shardState);
|
||||
nodeShardsResult = buildAllocationIdBasedNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
|
||||
allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
|
||||
enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
|
||||
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodeShardsResult.orderedAllocationCandidates.size(), shard, inSyncAllocationIds);
|
||||
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
|
||||
unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);
|
||||
}
|
||||
|
||||
if (enoughAllocationsFound == false){
|
||||
if (enoughAllocationsFound == false) {
|
||||
if (snapshotRestore) {
|
||||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.recoverySource());
|
||||
logger.debug("[{}][{}]: missing local data, will restore from [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
} else if (recoverOnAnyNode) {
|
||||
// let BalancedShardsAllocator take care of allocating this shard
|
||||
logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id());
|
||||
logger.debug("[{}][{}]: missing local data, recover from any node", unassignedShard.index(), unassignedShard.id());
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
} else {
|
||||
// we can't really allocate, so ignore it and continue
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes());
|
||||
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodeShardsResult.allocationsFound);
|
||||
// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
|
||||
// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
|
||||
// this shard will be picked up when the node joins and we do another allocation reroute
|
||||
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.NO_VALID_SHARD_COPY,
|
||||
"shard was previously allocated, but no valid shard copy could be found amongst the current nodes in the cluster");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, shard, false
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false
|
||||
);
|
||||
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
|
||||
NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0);
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
|
||||
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes());
|
||||
DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());
|
||||
final String nodeId = decidedNode.nodeShardState.getNode().getId();
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
"the allocation deciders returned a YES decision to allocate to node [" + nodeId + "]",
|
||||
nodeId, decidedNode.nodeShardState.allocationId(), buildNodeDecisions(nodesToAllocate, explain));
|
||||
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
|
||||
// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
|
||||
// can be force-allocated to one of the nodes.
|
||||
final NodesToAllocate nodesToForceAllocate = buildNodesToAllocate(
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, shard, true
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true
|
||||
);
|
||||
if (nodesToForceAllocate.yesNodeShards.isEmpty() == false) {
|
||||
NodeGatewayStartedShards nodeShardState = nodesToForceAllocate.yesNodeShards.get(0);
|
||||
final DecidedNode decidedNode = nodesToForceAllocate.yesNodeShards.get(0);
|
||||
final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
|
||||
shard.index(), shard.id(), shard, nodeShardState.getNode());
|
||||
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(),
|
||||
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation.changes());
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());
|
||||
final String nodeId = nodeShardState.getNode().getId();
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
"allocating the primary shard to node [" + nodeId+ "], which has a complete copy of the shard data",
|
||||
nodeId,
|
||||
nodeShardState.allocationId(),
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
} else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
|
||||
shard.index(), shard.id(), shard, nodesToForceAllocate.throttleNodeShards);
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes());
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToForceAllocate.throttleNodeShards);
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"allocation throttled as all nodes to which the shard may be force allocated are busy with other recoveries",
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
} else {
|
||||
logger.debug("[{}][{}]: forced primary allocation denied [{}]", shard.index(), shard.id(), shard);
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO, allocation.changes());
|
||||
logger.debug("[{}][{}]: forced primary allocation denied [{}]",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO,
|
||||
"all nodes that hold a valid shard copy returned a NO decision, and force allocation is not permitted",
|
||||
buildNodeDecisions(nodesToForceAllocate, explain));
|
||||
}
|
||||
} else {
|
||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards);
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED, allocation.changes());
|
||||
// we are throttling this, since we are allowed to allocate to this node but there are enough allocations
|
||||
// taking place on the node currently, ignore it for now
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"allocation throttled as all nodes to which the shard may be allocated are busy with other recoveries",
|
||||
buildNodeDecisions(nodesToAllocate, explain));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a map of nodes to the corresponding allocation decisions for those nodes.
|
||||
*/
|
||||
private static Map<String, Decision> buildNodeDecisions(NodesToAllocate nodesToAllocate, boolean explain) {
|
||||
if (explain == false) {
|
||||
// not in explain mode, no need to return node level decisions
|
||||
return null;
|
||||
}
|
||||
Map<String, Decision> nodeDecisions = new LinkedHashMap<>();
|
||||
for (final DecidedNode decidedNode : nodesToAllocate.yesNodeShards) {
|
||||
nodeDecisions.put(decidedNode.nodeShardState.getNode().getId(), decidedNode.decision);
|
||||
}
|
||||
for (final DecidedNode decidedNode : nodesToAllocate.throttleNodeShards) {
|
||||
nodeDecisions.put(decidedNode.nodeShardState.getNode().getId(), decidedNode.decision);
|
||||
}
|
||||
for (final DecidedNode decidedNode : nodesToAllocate.noNodeShards) {
|
||||
nodeDecisions.put(decidedNode.nodeShardState.getNode().getId(), decidedNode.decision);
|
||||
}
|
||||
return nodeDecisions;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -205,8 +260,10 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
* lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
|
||||
* entries with matching allocation id are always at the front of the list.
|
||||
*/
|
||||
protected NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||
Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState) {
|
||||
protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard,
|
||||
Set<String> ignoreNodes, Set<String> lastActiveAllocationIds,
|
||||
FetchResult<NodeGatewayStartedShards> shardState,
|
||||
Logger logger) {
|
||||
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
|
||||
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
|
||||
int numberOfAllocationsFound = 0;
|
||||
@ -299,9 +356,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
List<NodeGatewayStartedShards> nodeShardStates,
|
||||
ShardRouting shardRouting,
|
||||
boolean forceAllocate) {
|
||||
List<NodeGatewayStartedShards> yesNodeShards = new ArrayList<>();
|
||||
List<NodeGatewayStartedShards> throttledNodeShards = new ArrayList<>();
|
||||
List<NodeGatewayStartedShards> noNodeShards = new ArrayList<>();
|
||||
List<DecidedNode> yesNodeShards = new ArrayList<>();
|
||||
List<DecidedNode> throttledNodeShards = new ArrayList<>();
|
||||
List<DecidedNode> noNodeShards = new ArrayList<>();
|
||||
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
|
||||
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
|
||||
if (node == null) {
|
||||
@ -310,12 +367,13 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
|
||||
Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :
|
||||
allocation.deciders().canAllocate(shardRouting, node, allocation);
|
||||
if (decision.type() == Decision.Type.THROTTLE) {
|
||||
throttledNodeShards.add(nodeShardState);
|
||||
} else if (decision.type() == Decision.Type.NO) {
|
||||
noNodeShards.add(nodeShardState);
|
||||
DecidedNode decidedNode = new DecidedNode(nodeShardState, decision);
|
||||
if (decision.type() == Type.THROTTLE) {
|
||||
throttledNodeShards.add(decidedNode);
|
||||
} else if (decision.type() == Type.NO) {
|
||||
noNodeShards.add(decidedNode);
|
||||
} else {
|
||||
yesNodeShards.add(nodeShardState);
|
||||
yesNodeShards.add(decidedNode);
|
||||
}
|
||||
}
|
||||
return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards), Collections.unmodifiableList(noNodeShards));
|
||||
@ -325,8 +383,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
* Builds a list of previously started shards. If matchAnyShard is set to false, only shards with the highest shard version are added to
|
||||
* the list. Otherwise, any existing shard is added to the list, but entries with highest version are always at the front of the list.
|
||||
*/
|
||||
NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState) {
|
||||
static NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
|
||||
FetchResult<NodeGatewayStartedShards> shardState, Logger logger) {
|
||||
final List<NodeGatewayStartedShards> allocationCandidates = new ArrayList<>();
|
||||
int numberOfAllocationsFound = 0;
|
||||
long highestVersion = ShardStateMetaData.NO_VERSION;
|
||||
@ -400,7 +458,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
|
||||
static class NodeShardsResult {
|
||||
public final List<NodeGatewayStartedShards> orderedAllocationCandidates;
|
||||
@ -413,16 +471,28 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
}
|
||||
|
||||
static class NodesToAllocate {
|
||||
final List<NodeGatewayStartedShards> yesNodeShards;
|
||||
final List<NodeGatewayStartedShards> throttleNodeShards;
|
||||
final List<NodeGatewayStartedShards> noNodeShards;
|
||||
final List<DecidedNode> yesNodeShards;
|
||||
final List<DecidedNode> throttleNodeShards;
|
||||
final List<DecidedNode> noNodeShards;
|
||||
|
||||
public NodesToAllocate(List<NodeGatewayStartedShards> yesNodeShards,
|
||||
List<NodeGatewayStartedShards> throttleNodeShards,
|
||||
List<NodeGatewayStartedShards> noNodeShards) {
|
||||
public NodesToAllocate(List<DecidedNode> yesNodeShards, List<DecidedNode> throttleNodeShards, List<DecidedNode> noNodeShards) {
|
||||
this.yesNodeShards = yesNodeShards;
|
||||
this.throttleNodeShards = throttleNodeShards;
|
||||
this.noNodeShards = noNodeShards;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class encapsulates the shard state retrieved from a node and the decision that was made
|
||||
* by the allocator for allocating to the node that holds the shard copy.
|
||||
*/
|
||||
private static class DecidedNode {
|
||||
final NodeGatewayStartedShards nodeShardState;
|
||||
final Decision decision;
|
||||
|
||||
private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
|
||||
this.nodeShardState = nodeShardState;
|
||||
this.decision = decision;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
|
||||
import com.carrotsearch.hppc.ObjectLongMap;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
@ -31,24 +31,25 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.UnassignedShardDecision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
||||
|
||||
public ReplicaShardAllocator(Settings settings) {
|
||||
super(settings);
|
||||
@ -96,7 +97,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
continue;
|
||||
}
|
||||
|
||||
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores);
|
||||
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores, false);
|
||||
if (matchingNodes.getNodeWithHighestMatch() != null) {
|
||||
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
|
||||
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
|
||||
@ -128,37 +129,45 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
public void allocateUnassigned(RoutingAllocation allocation) {
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
if (shard.primary()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the allocator responsible for allocating the given {@link ShardRouting}?
|
||||
*/
|
||||
private static boolean isResponsibleFor(final ShardRouting shard) {
|
||||
return shard.primary() == false // must be a replica
|
||||
&& shard.unassigned() // must be unassigned
|
||||
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
|
||||
if (shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
|
||||
continue;
|
||||
&& shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UnassignedShardDecision makeAllocationDecision(final ShardRouting unassignedShard,
|
||||
final RoutingAllocation allocation,
|
||||
final Logger logger) {
|
||||
if (isResponsibleFor(unassignedShard) == false) {
|
||||
// this allocator is not responsible for deciding on this shard
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final boolean explain = allocation.debugDecision();
|
||||
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
|
||||
Decision decision = canBeAllocatedToAtLeastOneNode(shard, allocation);
|
||||
if (decision.type() != Decision.Type.YES) {
|
||||
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
|
||||
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes());
|
||||
continue;
|
||||
Tuple<Decision, Map<String, Decision>> allocateDecision = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation, explain);
|
||||
if (allocateDecision.v1().type() != Decision.Type.YES) {
|
||||
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.v1()),
|
||||
"all nodes returned a " + allocateDecision.v1().type() + " decision for allocating the replica shard",
|
||||
allocateDecision.v2());
|
||||
}
|
||||
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(shard, allocation);
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetchData(unassignedShard, allocation);
|
||||
if (shardStores.hasData() == false) {
|
||||
logger.trace("{}: ignoring allocation, still fetching shard stores", shard);
|
||||
logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard);
|
||||
allocation.setHasPendingAsyncFetch();
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.FETCHING_SHARD_DATA, allocation.changes());
|
||||
continue; // still fetching
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.FETCHING_SHARD_DATA,
|
||||
"still fetching shard state from the nodes in the cluster");
|
||||
}
|
||||
|
||||
ShardRouting primaryShard = routingNodes.activePrimary(shard.shardId());
|
||||
ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId());
|
||||
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
|
||||
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
|
||||
if (primaryStore == null) {
|
||||
@ -166,48 +175,42 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
|
||||
// will try and recover from
|
||||
// Note, this is the existing behavior, as exposed in running CorruptFileTest#testNoPrimaryData
|
||||
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
|
||||
continue;
|
||||
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", unassignedShard);
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores);
|
||||
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain);
|
||||
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";
|
||||
|
||||
if (matchingNodes.getNodeWithHighestMatch() != null) {
|
||||
RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId());
|
||||
// we only check on THROTTLE since we checked before before on NO
|
||||
decision = allocation.deciders().canAllocate(shard, nodeWithHighestMatch, allocation);
|
||||
Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation);
|
||||
if (decision.type() == Decision.Type.THROTTLE) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
|
||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.fromDecision(decision), allocation.changes());
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
|
||||
// we are throttling this, as we have enough other shards to allocate to this node, so ignore it for now
|
||||
return UnassignedShardDecision.throttleDecision(
|
||||
"returned a THROTTLE decision on each node that has an existing copy of the shard, so waiting to re-use one " +
|
||||
"of those copies", matchingNodes.nodeDecisions);
|
||||
} else {
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", shard.index(), shard.id(), shard, nodeWithHighestMatch.node());
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store",
|
||||
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
|
||||
// we found a match
|
||||
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), null, allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), allocation.changes());
|
||||
}
|
||||
} else if (matchingNodes.hasAnyData() == false) {
|
||||
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
|
||||
ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes());
|
||||
}
|
||||
return UnassignedShardDecision.yesDecision(
|
||||
"allocating to node [" + nodeWithHighestMatch.nodeId() + "] in order to re-use its unallocated persistent store",
|
||||
nodeWithHighestMatch.nodeId(), null, matchingNodes.nodeDecisions);
|
||||
}
|
||||
} else if (matchingNodes.hasAnyData() == false && unassignedShard.unassignedInfo().isDelayed()) {
|
||||
// if we didn't manage to find *any* data (regardless of matching sizes), and the replica is
|
||||
// unassigned due to a node leaving, so we delay allocation of this replica to see if the
|
||||
// node with the shard copy will rejoin so we can re-use the copy it has
|
||||
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
|
||||
return UnassignedShardDecision.noDecision(AllocationStatus.DELAYED_ALLOCATION,
|
||||
"not allocating this shard, no nodes contain data for the replica and allocation is delayed");
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the allocation of the replica is to be delayed. Compute the delay and if it is delayed, add it to the ignore unassigned list
|
||||
* Note: we only care about replica in delayed allocation, since if we have an unassigned primary it
|
||||
* will anyhow wait to find an existing copy of the shard to be allocated
|
||||
* Note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
|
||||
*
|
||||
* PUBLIC FOR TESTS!
|
||||
*
|
||||
* @param unassignedIterator iterator over unassigned shards
|
||||
* @param shard the shard which might be delayed
|
||||
*/
|
||||
public void ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard, RoutingChangesObserver changes) {
|
||||
if (shard.unassignedInfo().isDelayed()) {
|
||||
logger.debug("{}: allocation of [{}] is delayed", shard.shardId(), shard);
|
||||
unassignedIterator.removeAndIgnore(AllocationStatus.DELAYED_ALLOCATION, changes);
|
||||
}
|
||||
return UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -215,10 +218,15 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
*
|
||||
* Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one
|
||||
* node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided
|
||||
* YES or THROTTLE.
|
||||
* YES or THROTTLE). If the explain flag is turned on AND the decision is NO or THROTTLE, then this method
|
||||
* also returns a map of nodes to decisions (second value in the tuple) to use for explanations; if the explain
|
||||
* flag is off, the second value in the return tuple will be null.
|
||||
*/
|
||||
private Decision canBeAllocatedToAtLeastOneNode(ShardRouting shard, RoutingAllocation allocation) {
|
||||
private Tuple<Decision, Map<String, Decision>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
|
||||
RoutingAllocation allocation,
|
||||
boolean explain) {
|
||||
Decision madeDecision = Decision.NO;
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
for (ObjectCursor<DiscoveryNode> cursor : allocation.nodes().getDataNodes().values()) {
|
||||
RoutingNode node = allocation.routingNodes().node(cursor.value.getId());
|
||||
if (node == null) {
|
||||
@ -227,13 +235,16 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
// if we can't allocate it on a node, ignore it, for example, this handles
|
||||
// cases for only allocating a replica after a primary
|
||||
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
|
||||
if (explain) {
|
||||
nodeDecisions.put(node.nodeId(), decision);
|
||||
}
|
||||
if (decision.type() == Decision.Type.YES) {
|
||||
return decision;
|
||||
return Tuple.tuple(decision, null);
|
||||
} else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) {
|
||||
madeDecision = decision;
|
||||
}
|
||||
}
|
||||
return madeDecision;
|
||||
return Tuple.tuple(madeDecision, explain ? nodeDecisions : null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -254,8 +265,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
|
||||
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
|
||||
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data) {
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> data,
|
||||
boolean explain) {
|
||||
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
|
||||
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
||||
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
|
||||
@ -273,6 +286,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
|
||||
// then we will try and assign it next time
|
||||
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
|
||||
if (explain) {
|
||||
nodeDecisions.put(node.nodeId(), decision);
|
||||
}
|
||||
|
||||
if (decision.type() == Decision.Type.NO) {
|
||||
continue;
|
||||
}
|
||||
@ -297,7 +314,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
return new MatchingNodes(nodesToSize);
|
||||
return new MatchingNodes(nodesToSize, explain ? nodeDecisions : null);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
@ -305,9 +322,12 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
static class MatchingNodes {
|
||||
private final ObjectLongMap<DiscoveryNode> nodesToSize;
|
||||
private final DiscoveryNode nodeWithHighestMatch;
|
||||
@Nullable
|
||||
private final Map<String, Decision> nodeDecisions;
|
||||
|
||||
public MatchingNodes(ObjectLongMap<DiscoveryNode> nodesToSize) {
|
||||
public MatchingNodes(ObjectLongMap<DiscoveryNode> nodesToSize, @Nullable Map<String, Decision> nodeDecisions) {
|
||||
this.nodesToSize = nodesToSize;
|
||||
this.nodeDecisions = nodeDecisions;
|
||||
|
||||
long highestMatchSize = 0;
|
||||
DiscoveryNode highestMatchNode = null;
|
||||
@ -340,5 +360,13 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
|
||||
public boolean hasAnyData() {
|
||||
return nodesToSize.isEmpty() == false;
|
||||
}
|
||||
|
||||
/**
|
||||
* The decisions map for all nodes with a shard copy, if available.
|
||||
*/
|
||||
@Nullable
|
||||
public Map<String, Decision> getNodeDecisions() {
|
||||
return nodeDecisions;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Unit tests for the {@link UnassignedShardDecision} class.
|
||||
*/
|
||||
public class UnassignedShardDecisionTests extends ESTestCase {
|
||||
|
||||
public void testDecisionNotTaken() {
|
||||
UnassignedShardDecision unassignedShardDecision = UnassignedShardDecision.DECISION_NOT_TAKEN;
|
||||
assertFalse(unassignedShardDecision.isDecisionTaken());
|
||||
assertNull(unassignedShardDecision.getFinalDecision());
|
||||
assertNull(unassignedShardDecision.getAllocationStatus());
|
||||
assertNull(unassignedShardDecision.getAllocationId());
|
||||
assertNull(unassignedShardDecision.getAssignedNodeId());
|
||||
assertNull(unassignedShardDecision.getFinalExplanation());
|
||||
assertNull(unassignedShardDecision.getNodeDecisions());
|
||||
expectThrows(IllegalArgumentException.class, () -> unassignedShardDecision.getFinalDecisionSafe());
|
||||
expectThrows(IllegalArgumentException.class, () -> unassignedShardDecision.getFinalExplanationSafe());
|
||||
}
|
||||
|
||||
public void testNoDecision() {
|
||||
final AllocationStatus allocationStatus = randomFrom(
|
||||
AllocationStatus.DELAYED_ALLOCATION, AllocationStatus.NO_VALID_SHARD_COPY, AllocationStatus.FETCHING_SHARD_DATA
|
||||
);
|
||||
UnassignedShardDecision noDecision = UnassignedShardDecision.noDecision(allocationStatus, "something is wrong");
|
||||
assertTrue(noDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision().type());
|
||||
assertEquals(allocationStatus, noDecision.getAllocationStatus());
|
||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||
assertNull(noDecision.getNodeDecisions());
|
||||
assertNull(noDecision.getAssignedNodeId());
|
||||
assertNull(noDecision.getAllocationId());
|
||||
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
nodeDecisions.put("node1", Decision.NO);
|
||||
nodeDecisions.put("node2", Decision.NO);
|
||||
noDecision = UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO, "something is wrong", nodeDecisions);
|
||||
assertTrue(noDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.NO, noDecision.getFinalDecision().type());
|
||||
assertEquals(AllocationStatus.DECIDERS_NO, noDecision.getAllocationStatus());
|
||||
assertEquals("something is wrong", noDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, noDecision.getNodeDecisions());
|
||||
assertNull(noDecision.getAssignedNodeId());
|
||||
assertNull(noDecision.getAllocationId());
|
||||
|
||||
// test bad values
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.noDecision(null, "a"));
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.noDecision(AllocationStatus.DECIDERS_NO, null));
|
||||
}
|
||||
|
||||
public void testThrottleDecision() {
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
nodeDecisions.put("node1", Decision.NO);
|
||||
nodeDecisions.put("node2", Decision.THROTTLE);
|
||||
UnassignedShardDecision throttleDecision = UnassignedShardDecision.throttleDecision("too much happening", nodeDecisions);
|
||||
assertTrue(throttleDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.THROTTLE, throttleDecision.getFinalDecision().type());
|
||||
assertEquals(AllocationStatus.DECIDERS_THROTTLED, throttleDecision.getAllocationStatus());
|
||||
assertEquals("too much happening", throttleDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, throttleDecision.getNodeDecisions());
|
||||
assertNull(throttleDecision.getAssignedNodeId());
|
||||
assertNull(throttleDecision.getAllocationId());
|
||||
|
||||
// test bad values
|
||||
expectThrows(NullPointerException.class, () -> UnassignedShardDecision.throttleDecision(null, Collections.emptyMap()));
|
||||
}
|
||||
|
||||
public void testYesDecision() {
|
||||
Map<String, Decision> nodeDecisions = new HashMap<>();
|
||||
nodeDecisions.put("node1", Decision.YES);
|
||||
nodeDecisions.put("node2", Decision.NO);
|
||||
String allocId = randomBoolean() ? "allocId" : null;
|
||||
UnassignedShardDecision yesDecision = UnassignedShardDecision.yesDecision(
|
||||
"node was very kind", "node1", allocId, nodeDecisions
|
||||
);
|
||||
assertTrue(yesDecision.isDecisionTaken());
|
||||
assertEquals(Decision.Type.YES, yesDecision.getFinalDecision().type());
|
||||
assertNull(yesDecision.getAllocationStatus());
|
||||
assertEquals("node was very kind", yesDecision.getFinalExplanation());
|
||||
assertEquals(nodeDecisions, yesDecision.getNodeDecisions());
|
||||
assertEquals("node1", yesDecision.getAssignedNodeId());
|
||||
assertEquals(allocId, yesDecision.getAllocationId());
|
||||
|
||||
expectThrows(NullPointerException.class,
|
||||
() -> UnassignedShardDecision.yesDecision(null, "a", randomBoolean() ? "a" : null, Collections.emptyMap()));
|
||||
expectThrows(NullPointerException.class,
|
||||
() -> UnassignedShardDecision.yesDecision("a", null, null, Collections.emptyMap()));
|
||||
}
|
||||
}
|
@ -38,10 +38,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationD
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||
import org.elasticsearch.gateway.AsyncShardFetch;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.gateway.ReplicaShardAllocator;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||
|
||||
@ -209,14 +206,6 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
||||
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
|
||||
*/
|
||||
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
|
||||
private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
|
||||
@Override
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
|
||||
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
return new AsyncShardFetch.FetchResult<>(shard.shardId(), null, Collections.emptySet(), Collections.emptySet());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
public DelayedShardsMockGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
@ -236,7 +225,9 @@ public abstract class ESAllocationTestCase extends ESTestCase {
|
||||
if (shard.primary() || shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
|
||||
continue;
|
||||
}
|
||||
replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard, allocation.changes());
|
||||
if (shard.unassignedInfo().isDelayed()) {
|
||||
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.DELAYED_ALLOCATION, allocation.changes());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user