Validate routing commands using updated routing state (#42066)
When multiple commands are called in sequence, fetch shards from mutable, up-to-date routing nodes to ensure each command's changes are visible to subsequent commands. This addresses an issue uncovered during work on #41050.
This commit is contained in:
parent
aea600fe7d
commit
130c832e10
|
@ -110,13 +110,20 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
|
|||
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
|
||||
}
|
||||
|
||||
final ShardRouting shardRouting;
|
||||
try {
|
||||
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
} catch (IndexNotFoundException | ShardNotFoundException e) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation, e);
|
||||
}
|
||||
if (shardRouting.unassigned() == false) {
|
||||
|
||||
ShardRouting shardRouting = null;
|
||||
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
|
||||
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
|
||||
shardRouting = shard;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (shardRouting == null) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
|
@ -35,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -101,20 +101,34 @@ public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocation
|
|||
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
|
||||
}
|
||||
|
||||
final ShardRouting primaryShardRouting;
|
||||
try {
|
||||
primaryShardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
} catch (IndexNotFoundException | ShardNotFoundException e) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation, e);
|
||||
}
|
||||
if (primaryShardRouting.unassigned()) {
|
||||
|
||||
ShardRouting primaryShardRouting = null;
|
||||
for (RoutingNode node : allocation.routingNodes()) {
|
||||
for (ShardRouting shard : node) {
|
||||
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
|
||||
primaryShardRouting = shard;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (primaryShardRouting == null) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation,
|
||||
"trying to allocate a replica shard [" + index + "][" + shardId +
|
||||
"], while corresponding primary shard is still unassigned");
|
||||
}
|
||||
|
||||
List<ShardRouting> replicaShardRoutings =
|
||||
allocation.routingTable().shardRoutingTable(index, shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
|
||||
List<ShardRouting> replicaShardRoutings = new ArrayList<>();
|
||||
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
|
||||
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary() == false) {
|
||||
replicaShardRoutings.add(shard);
|
||||
}
|
||||
}
|
||||
|
||||
ShardRouting shardRouting;
|
||||
if (replicaShardRoutings.isEmpty()) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation,
|
||||
|
|
|
@ -108,13 +108,20 @@ public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocation
|
|||
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
|
||||
}
|
||||
|
||||
final ShardRouting shardRouting;
|
||||
try {
|
||||
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
|
||||
} catch (IndexNotFoundException | ShardNotFoundException e) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation, e);
|
||||
}
|
||||
if (shardRouting.unassigned() == false) {
|
||||
|
||||
ShardRouting shardRouting = null;
|
||||
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
|
||||
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
|
||||
shardRouting = shard;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (shardRouting == null) {
|
||||
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
|
||||
}
|
||||
|
||||
|
|
|
@ -677,4 +677,76 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
|
|||
assertEquals("[move_allocation] can't move [test][0] from " + node2 + " to " + node1 +
|
||||
": source [" + node2.getName() + "] is not a data node.", e.getMessage());
|
||||
}
|
||||
|
||||
public void testConflictingCommandsInSingleRequest() {
|
||||
AllocationService allocation = createAllocationService(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
|
||||
.build());
|
||||
|
||||
final String index1 = "test1";
|
||||
final String index2 = "test2";
|
||||
final String index3 = "test3";
|
||||
logger.info("--> building initial routing table");
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder(index1).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
|
||||
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
|
||||
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
|
||||
.put(IndexMetaData.builder(index2).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
|
||||
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
|
||||
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
|
||||
.put(IndexMetaData.builder(index3).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
|
||||
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
|
||||
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsRecovery(metaData.index(index1))
|
||||
.addAsRecovery(metaData.index(index2))
|
||||
.addAsRecovery(metaData.index(index3))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
|
||||
.metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
final String node1 = "node1";
|
||||
final String node2 = "node2";
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
|
||||
.add(newNode(node1))
|
||||
.add(newNode(node2))
|
||||
).build();
|
||||
final ClusterState finalClusterState = allocation.reroute(clusterState, "reroute");
|
||||
|
||||
logger.info("--> allocating same index primary in multiple commands should fail");
|
||||
assertThat(expectThrows(IllegalArgumentException.class, () -> {
|
||||
allocation.reroute(finalClusterState,
|
||||
new AllocationCommands(
|
||||
new AllocateStalePrimaryAllocationCommand(index1, 0, node1, true),
|
||||
new AllocateStalePrimaryAllocationCommand(index1, 0, node2, true)
|
||||
), false, false);
|
||||
}).getMessage(), containsString("primary [" + index1 + "][0] is already assigned"));
|
||||
|
||||
assertThat(expectThrows(IllegalArgumentException.class, () -> {
|
||||
allocation.reroute(finalClusterState,
|
||||
new AllocationCommands(
|
||||
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node1, true),
|
||||
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node2, true)
|
||||
), false, false);
|
||||
}).getMessage(), containsString("primary [" + index2 + "][0] is already assigned"));
|
||||
|
||||
|
||||
clusterState = allocation.reroute(clusterState,
|
||||
new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(index3, 0, node1, true)), false, false).getClusterState();
|
||||
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
|
||||
|
||||
final ClusterState updatedClusterState = clusterState;
|
||||
assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1));
|
||||
|
||||
logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated");
|
||||
assertThat(expectThrows(IllegalArgumentException.class, () -> {
|
||||
allocation.reroute(updatedClusterState,
|
||||
new AllocationCommands(
|
||||
new AllocateReplicaAllocationCommand(index3, 0, node2),
|
||||
new AllocateReplicaAllocationCommand(index3, 0, node2)
|
||||
), false, false);
|
||||
}).getMessage(), containsString("all copies of [" + index3 + "][0] are already assigned. Use the move allocation command instead"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue