Primary shard allocator observes limits in forcing allocation (#19811)
Primary shard allocation observes limits in forcing allocation Previously, during primary shards allocation of shards with prior allocation IDs, if all nodes returned a NO decision for allocation (e.g. the settings blocked allocation on that node), we would chose one of those nodes and force the primary shard to be allocated to it. However, this meant that primary shard allocation would not adhere to the decision of the MaxRetryAllocationDecider, which would lead to attempting to allocate a shard which has failed N number of times already (presumably due to some configuration issue). This commit solves this issue by introducing the notion of force allocating a primary shard to a node and each decider implementation must implement whether this is allowed or not. In the case of MaxRetryAllocationDecider, it just forwards the request to canAllocate. Closes #19446
This commit is contained in:
parent
eea1bc719b
commit
88aff40eef
|
@ -252,6 +252,17 @@ public class RoutingAllocation {
|
|||
nodes.add(nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the given node id should be ignored from consideration when {@link AllocationDeciders}
|
||||
* is deciding whether to allocate the specified shard id to that node. The node will be ignored if
|
||||
* the specified shard failed on that node, triggering the current round of allocation. Since the shard
|
||||
* just failed on that node, we don't want to try to reassign it there, if the node is still a part
|
||||
* of the cluster.
|
||||
*
|
||||
* @param shardId the shard id to be allocated
|
||||
* @param nodeId the node id to check against
|
||||
* @return true if the node id should be ignored in allocation decisions, false otherwise
|
||||
*/
|
||||
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
|
||||
if (ignoredShardToNodes == null) {
|
||||
return false;
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
|
@ -98,4 +99,30 @@ public abstract class AllocationDecider extends AbstractComponent {
|
|||
public Decision canRebalance(RoutingAllocation allocation) {
|
||||
return Decision.ALWAYS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Decision} whether the given primary shard can be
|
||||
* forcibly allocated on the given node. This method should only be called
|
||||
* for unassigned primary shards where the node has a shard copy on disk.
|
||||
*
|
||||
* Note: all implementations that override this behavior should take into account
|
||||
* the results of {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)}
|
||||
* before making a decision on force allocation, because force allocation should only
|
||||
* be considered if all deciders return {@link Decision#NO}.
|
||||
*/
|
||||
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
|
||||
assert shardRouting.unassigned() : "must not call canForceAllocatePrimary on an assigned shard " + shardRouting;
|
||||
Decision decision = canAllocate(shardRouting, node, allocation);
|
||||
if (decision.type() == Type.NO) {
|
||||
// On a NO decision, by default, we allow force allocating the primary.
|
||||
return allocation.decision(Decision.YES,
|
||||
decision.label(),
|
||||
"primary shard [{}] allowed to force allocate on node [{}]",
|
||||
shardRouting.shardId(), node.nodeId());
|
||||
} else {
|
||||
// On a THROTTLE/YES decision, we use the same decision instead of forcing allocation
|
||||
return decision;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,4 +196,32 @@ public class AllocationDeciders extends AllocationDecider {
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard routing " + shardRouting;
|
||||
|
||||
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
|
||||
return Decision.NO;
|
||||
}
|
||||
Decision.Multi ret = new Decision.Multi();
|
||||
for (AllocationDecider decider : allocations) {
|
||||
Decision decision = decider.canForceAllocatePrimary(shardRouting, node, allocation);
|
||||
// short track if a NO is returned.
|
||||
if (decision == Decision.NO) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].",
|
||||
shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName());
|
||||
}
|
||||
if (!allocation.debugDecision()) {
|
||||
return decision;
|
||||
} else {
|
||||
ret.add(decision);
|
||||
}
|
||||
} else if (decision != Decision.ALWAYS) {
|
||||
ret.add(decision);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public abstract class Decision implements ToXContent {
|
|||
* @param explanationParams additional parameters for the decision
|
||||
* @return new {@link Decision} instance
|
||||
*/
|
||||
public static Decision single(Type type, String label, String explanation, Object... explanationParams) {
|
||||
public static Decision single(Type type, @Nullable String label, @Nullable String explanation, @Nullable Object... explanationParams) {
|
||||
return new Single(type, label, explanation, explanationParams);
|
||||
}
|
||||
|
||||
|
@ -146,6 +146,9 @@ public abstract class Decision implements ToXContent {
|
|||
*/
|
||||
public abstract Type type();
|
||||
|
||||
/**
|
||||
* Get the description label for this decision.
|
||||
*/
|
||||
@Nullable
|
||||
public abstract String label();
|
||||
|
||||
|
|
|
@ -80,4 +80,12 @@ public class MaxRetryAllocationDecider extends AllocationDecider {
|
|||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
return canAllocate(shardRouting, allocation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
|
||||
// check if we have passed the maximum retry threshold through canAllocate,
|
||||
// if so, we don't want to force the primary allocation here
|
||||
return canAllocate(shardRouting, node, allocation);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,10 +27,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -47,8 +49,18 @@ import java.util.function.Function;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The primary shard allocator allocates primary shard that were not created as
|
||||
* a result of an API to a node that held them last to be recovered.
|
||||
* The primary shard allocator allocates unassigned primary shards to nodes that hold
|
||||
* valid copies of the unassigned primaries. It does this by iterating over all unassigned
|
||||
* primary shards in the routing table and fetching shard metadata from each node in the cluster
|
||||
* that holds a copy of the shard. The shard metadata from each node is compared against the
|
||||
* set of valid allocation IDs and for all valid shard copies (if any), the primary shard allocator
|
||||
* executes the allocation deciders to chose a copy to assign the primary shard to.
|
||||
*
|
||||
* Note that the PrimaryShardAllocator does *not* allocate primaries on index creation
|
||||
* (see {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}),
|
||||
* nor does it allocate primaries when a primary shard failed and there is a valid replica
|
||||
* copy that can immediately be promoted to primary, as this takes place in
|
||||
* {@link RoutingNodes#failShard(ESLogger, ShardRouting, UnassignedInfo, IndexMetaData)}.
|
||||
*/
|
||||
public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
|
||||
|
@ -154,17 +166,35 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
|
||||
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodeShardsResult.orderedAllocationCandidates);
|
||||
final NodesToAllocate nodesToAllocate = buildNodesToAllocate(
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, shard, false
|
||||
);
|
||||
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
|
||||
NodeGatewayStartedShards nodeShardState = nodesToAllocate.yesNodeShards.get(0);
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
|
||||
changed = true;
|
||||
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||
} else if (nodesToAllocate.throttleNodeShards.isEmpty() == true && nodesToAllocate.noNodeShards.isEmpty() == false) {
|
||||
NodeGatewayStartedShards nodeShardState = nodesToAllocate.noNodeShards.get(0);
|
||||
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodeShardState.getNode());
|
||||
// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
|
||||
// can be force-allocated to one of the nodes.
|
||||
final NodesToAllocate nodesToForceAllocate = buildNodesToAllocate(
|
||||
allocation, nodeShardsResult.orderedAllocationCandidates, shard, true
|
||||
);
|
||||
if (nodesToForceAllocate.yesNodeShards.isEmpty() == false) {
|
||||
NodeGatewayStartedShards nodeShardState = nodesToForceAllocate.yesNodeShards.get(0);
|
||||
logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
|
||||
shard.index(), shard.id(), shard, nodeShardState.getNode());
|
||||
changed = true;
|
||||
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(), ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||
unassignedIterator.initialize(nodeShardState.getNode().getId(), nodeShardState.allocationId(),
|
||||
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||
} else if (nodesToForceAllocate.throttleNodeShards.isEmpty() == false) {
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
|
||||
shard.index(), shard.id(), shard, nodesToForceAllocate.throttleNodeShards);
|
||||
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_THROTTLED);
|
||||
} else {
|
||||
logger.debug("[{}][{}]: forced primary allocation denied [{}]", shard.index(), shard.id(), shard);
|
||||
changed |= unassignedIterator.removeAndIgnore(AllocationStatus.DECIDERS_NO);
|
||||
}
|
||||
} else {
|
||||
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodeShards);
|
||||
|
@ -268,7 +298,10 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
/**
|
||||
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
|
||||
*/
|
||||
private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<NodeGatewayStartedShards> nodeShardStates) {
|
||||
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
|
||||
List<NodeGatewayStartedShards> nodeShardStates,
|
||||
ShardRouting shardRouting,
|
||||
boolean forceAllocate) {
|
||||
List<NodeGatewayStartedShards> yesNodeShards = new ArrayList<>();
|
||||
List<NodeGatewayStartedShards> throttledNodeShards = new ArrayList<>();
|
||||
List<NodeGatewayStartedShards> noNodeShards = new ArrayList<>();
|
||||
|
@ -278,7 +311,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
|
||||
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
|
||||
Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :
|
||||
allocation.deciders().canAllocate(shardRouting, node, allocation);
|
||||
if (decision.type() == Decision.Type.THROTTLE) {
|
||||
throttledNodeShards.add(nodeShardState);
|
||||
} else if (decision.type() == Decision.Type.NO) {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuild
|
|||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
|
@ -219,4 +220,32 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
|
|||
ensureYellow("test");
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test ensures that for an unassigned primary shard that has a valid shard copy on at least one node,
|
||||
* we will force allocate the primary shard to one of those nodes, even if the allocation deciders all return
|
||||
* a NO decision to allocate.
|
||||
*/
|
||||
public void testForceAllocatePrimaryOnNoDecision() throws Exception {
|
||||
logger.info("--> starting 1 node");
|
||||
final String node = internalCluster().startNodeAsync().get();
|
||||
logger.info("--> creating index with 1 primary and 0 replicas");
|
||||
final String indexName = "test-idx";
|
||||
assertAcked(client().admin().indices()
|
||||
.prepareCreate(indexName)
|
||||
.setSettings(Settings.builder().put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0))
|
||||
.get());
|
||||
logger.info("--> update the settings to prevent allocation to the data node");
|
||||
assertTrue(client().admin().indices().prepareUpdateSettings(indexName)
|
||||
.setSettings(Settings.builder().put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + "_name", node))
|
||||
.get()
|
||||
.isAcknowledged());
|
||||
logger.info("--> full cluster restart");
|
||||
internalCluster().fullRestart();
|
||||
logger.info("--> checking that the primary shard is force allocated to the data node despite being blocked by the exclude filter");
|
||||
ensureGreen(indexName);
|
||||
assertEquals(1, client().admin().cluster().prepareState().get().getState()
|
||||
.routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,9 +27,11 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
|
@ -147,9 +149,13 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
|
|||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), i+1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom" + i);
|
||||
ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0);
|
||||
assertEquals(unassignedPrimary.state(), INITIALIZING);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), i+1);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "boom" + i);
|
||||
// MaxRetryAllocationDecider#canForceAllocatePrimary should return YES decisions because canAllocate returns YES here
|
||||
assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary(
|
||||
unassignedPrimary, null, new RoutingAllocation(null, null, clusterState, null, 0, false)));
|
||||
}
|
||||
// now we go and check that we are actually stick to unassigned on the next failure
|
||||
{
|
||||
|
@ -161,9 +167,13 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
|
|||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), UNASSIGNED);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(unassignedPrimary.state(), UNASSIGNED);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "boom");
|
||||
// MaxRetryAllocationDecider#canForceAllocatePrimary should return a NO decision because canAllocate returns NO here
|
||||
assertEquals(Decision.NO, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary(
|
||||
unassignedPrimary, null, new RoutingAllocation(null, null, clusterState, null, 0, false)));
|
||||
}
|
||||
|
||||
// change the settings and ensure we can do another round of allocation for that index.
|
||||
|
@ -179,13 +189,17 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
|
|||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
// good we are initializing and we are maintaining failure information
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "boom");
|
||||
ShardRouting unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), retries);
|
||||
assertEquals(unassignedPrimary.state(), INITIALIZING);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "boom");
|
||||
// bumped up the max retry count, so canForceAllocatePrimary should return a YES decision
|
||||
assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary(
|
||||
routingTable.index("idx").shard(0).shards().get(0), null, new RoutingAllocation(null, null, clusterState, null, 0, false)));
|
||||
|
||||
// now we start the shard
|
||||
routingTable = strategy.applyStartedShards(clusterState, Collections.singletonList(routingTable.index("idx")
|
||||
.shard(0).shards().get(0))).routingTable();
|
||||
routingTable = strategy.applyStartedShards(clusterState, Collections.singletonList(
|
||||
routingTable.index("idx").shard(0).shards().get(0))).routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
|
||||
// all counters have been reset to 0 ie. no unassigned info
|
||||
|
@ -202,8 +216,13 @@ public class MaxRetryAllocationDeciderTests extends ESAllocationTestCase {
|
|||
routingTable = result.routingTable();
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
|
||||
assertEquals(routingTable.index("idx").shards().size(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getNumFailedAllocations(), 1);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).state(), INITIALIZING);
|
||||
assertEquals(routingTable.index("idx").shard(0).shards().get(0).unassignedInfo().getMessage(), "ZOOOMG");
|
||||
unassignedPrimary = routingTable.index("idx").shard(0).shards().get(0);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getNumFailedAllocations(), 1);
|
||||
assertEquals(unassignedPrimary.state(), INITIALIZING);
|
||||
assertEquals(unassignedPrimary.unassignedInfo().getMessage(), "ZOOOMG");
|
||||
// Counter reset, so MaxRetryAllocationDecider#canForceAllocatePrimary should return a YES decision
|
||||
assertEquals(Decision.YES, new MaxRetryAllocationDecider(Settings.EMPTY).canForceAllocatePrimary(
|
||||
unassignedPrimary, null, new RoutingAllocation(null, null, clusterState, null, 0, false)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,12 +30,16 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -49,6 +53,7 @@ import org.junit.Before;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
|
@ -192,6 +197,74 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when the nodes with prior copies of the given shard all return a decision of NO, but
|
||||
* {@link AllocationDecider#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
|
||||
* returns a YES decision for at least one of those NO nodes, then we force allocate to one of them
|
||||
*/
|
||||
public void testForceAllocatePrimary() {
|
||||
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {
|
||||
// since the deciders return a NO decision for allocating a shard (due to the guaranteed NO decision from the second decider),
|
||||
// the allocator will see if it can force assign the primary, where the decision will be YES
|
||||
new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), getNoDeciderThatAllowsForceAllocate()
|
||||
});
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertTrue(changed);
|
||||
assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty());
|
||||
assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 1);
|
||||
assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), node1.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when the nodes with prior copies of the given shard all return a decision of NO, and
|
||||
* {@link AllocationDecider#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
|
||||
* returns a NO or THROTTLE decision for a node, then we do not force allocate to that node.
|
||||
*/
|
||||
public void testDontAllocateOnNoOrThrottleForceAllocationDecision() {
|
||||
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
|
||||
boolean forceDecisionNo = randomBoolean();
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {
|
||||
// since both deciders here return a NO decision for allocating a shard,
|
||||
// the allocator will see if it can force assign the primary, where the decision will be either NO or THROTTLE,
|
||||
// so the shard will remain un-initialized
|
||||
new TestAllocateDecision(Decision.NO), forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() :
|
||||
getNoDeciderThatThrottlesForceAllocate()
|
||||
});
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertTrue(changed);
|
||||
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
|
||||
assertEquals(ignored.size(), 1);
|
||||
assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(),
|
||||
forceDecisionNo ? AllocationStatus.DECIDERS_NO : AllocationStatus.DECIDERS_THROTTLED);
|
||||
assertTrue(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when the nodes with prior copies of the given shard return a THROTTLE decision,
|
||||
* then we do not force allocate to that node but instead throttle.
|
||||
*/
|
||||
public void testDontForceAllocateOnThrottleDecision() {
|
||||
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
|
||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {
|
||||
// since we have a NO decision for allocating a shard (because the second decider returns a NO decision),
|
||||
// the allocator will see if it can force assign the primary, and in this case,
|
||||
// the TestAllocateDecision's decision for force allocating is to THROTTLE (using
|
||||
// the default behavior) so despite the other decider's decision to return YES for
|
||||
// force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision
|
||||
new TestAllocateDecision(Decision.THROTTLE), getNoDeciderThatAllowsForceAllocate()
|
||||
});
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, false, Version.CURRENT, "allocId1");
|
||||
boolean changed = testAllocator.allocateUnassigned(allocation);
|
||||
assertTrue(changed);
|
||||
List<ShardRouting> ignored = allocation.routingNodes().unassigned().ignored();
|
||||
assertEquals(ignored.size(), 1);
|
||||
assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(), AllocationStatus.DECIDERS_THROTTLED);
|
||||
assertTrue(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when there was a node that previously had the primary, it will be allocated to that same node again.
|
||||
*/
|
||||
|
@ -542,10 +615,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertClusterHealthStatus(allocation, ClusterHealthStatus.RED);
|
||||
}
|
||||
|
||||
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
|
||||
private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version,
|
||||
String... activeAllocationIds) {
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version))
|
||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, Sets.newHashSet(activeAllocationIds)))
|
||||
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)))
|
||||
.build();
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||
if (asNew) {
|
||||
|
@ -573,6 +647,28 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(clusterStateHealth.getStatus().ordinal(), lessThanOrEqualTo(expectedStatus.ordinal()));
|
||||
}
|
||||
|
||||
private AllocationDecider getNoDeciderThatAllowsForceAllocate() {
|
||||
return getNoDeciderWithForceAllocate(Decision.YES);
|
||||
}
|
||||
|
||||
private AllocationDecider getNoDeciderThatThrottlesForceAllocate() {
|
||||
return getNoDeciderWithForceAllocate(Decision.THROTTLE);
|
||||
}
|
||||
|
||||
private AllocationDecider getNoDeciderThatDeniesForceAllocate() {
|
||||
return getNoDeciderWithForceAllocate(Decision.NO);
|
||||
}
|
||||
|
||||
private AllocationDecider getNoDeciderWithForceAllocate(final Decision forceAllocateDecision) {
|
||||
return new TestAllocateDecision(Decision.NO) {
|
||||
@Override
|
||||
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
assert shardRouting.primary() : "cannot force allocate a non-primary shard " + shardRouting;
|
||||
return forceAllocateDecision;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
class TestAllocator extends PrimaryShardAllocator {
|
||||
|
||||
private Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data;
|
||||
|
|
Loading…
Reference in New Issue