[7.x] Account for node versions during allocation in ILM Shrink (#43300)

This commit ensures that ILM's Shrink action will take node versions into
account when choosing which node to allocate to when shrinking an
index. Prior to this change, ILM could pick a node with a lower version
than some shards are already allocated to, which causes the new
allocation to fail as shards can't be relocated onto a node with a lower
version than they are already on.

As part of this, when making the decision about which node to allocate
to prior to Shrink, all shards in the index are considered, rather than
choosing a random shard to consider.

Further, the unit tests for the logic that chooses a node to allocate
shards to pre-shrink has been improved to validate the behavior in more
realistic and varied initial conditions.
This commit is contained in:
Gordon Brown 2019-06-24 10:02:49 -06:00 committed by GitHub
parent 98d7d231bb
commit fac7efba9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 300 additions and 39 deletions

View File

@ -5,38 +5,51 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Allocates all shards in a single index to one node.
* For example, as preparation for shrinking that index.
*/
public class SetSingleNodeAllocateStep extends AsyncActionStep {
private static final Logger logger = LogManager.getLogger(SetSingleNodeAllocateStep.class);
public static final String NAME = "set-single-node-allocation";
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Collections.singletonList(
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))));
// These allocation deciders were chosen because these are the conditions that can prevent
// allocation long-term, and that we can inspect in advance. Most other allocation deciders
// will either only delay relocation (e.g. ThrottlingAllocationDecider), or don't work very
// well when reallocating potentially many shards at once (e.g. DiskThresholdDecider)
private static final AllocationDeciders ALLOCATION_DECIDERS = new AllocationDeciders(Arrays.asList(
new FilterAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
new NodeVersionAllocationDecider()
));
public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client) {
super(key, nextStepKey, client);
@ -44,18 +57,24 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep {
@Override
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
final RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null,
System.nanoTime());
List<String> validNodeIds = new ArrayList<>();
Optional<ShardRouting> anyShard = clusterState.getRoutingTable().allShards(indexMetaData.getIndex().getName()).stream().findAny();
if (anyShard.isPresent()) {
// Iterate through the nodes finding ones that are acceptable for the current allocation rules of the shard
for (RoutingNode node : clusterState.getRoutingNodes()) {
boolean canRemainOnCurrentNode = ALLOCATION_DECIDERS.canRemain(anyShard.get(), node, allocation)
.type() == Decision.Type.YES;
if (canRemainOnCurrentNode) {
DiscoveryNode discoveryNode = node.node();
validNodeIds.add(discoveryNode.getId());
final Map<ShardId, List<ShardRouting>> routingsByShardId = clusterState.getRoutingTable()
.allShards(indexMetaData.getIndex().getName())
.stream()
.collect(Collectors.groupingBy(ShardRouting::shardId));
if (routingsByShardId.isEmpty() == false) {
for (RoutingNode node : routingNodes) {
boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard
.allMatch(shardRoutings -> shardRoutings.stream() // Can we allocate at least one shard copy to this node?
.map(shardRouting -> ALLOCATION_DECIDERS.canAllocate(shardRouting, node, allocation).type())
.anyMatch(Decision.Type.YES::equals));
if (canAllocateOneCopyOfEachShard) {
validNodeIds.add(node.node().getId());
}
}
// Shuffle the list of nodes so the one we pick is random
@ -70,6 +89,7 @@ public class SetSingleNodeAllocateStep extends AsyncActionStep {
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
} else {
// No nodes currently match the allocation rules so just wait until there is one that does
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink");
listener.onResponse(false);
}
} else {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.hamcrest.Matchers;
@ -39,14 +40,18 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSingleNodeAllocateStep> {
@ -98,13 +103,13 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
}
public void testPerformActionNoAttrs() throws IOException {
final int numNodes = randomIntBetween(1, 20);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Set<String> validNodeIds = new HashSet<>();
Settings validNodeSettings = Settings.EMPTY;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -120,6 +125,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
public void testPerformActionAttrsAllNodesValid() throws IOException {
int numAttrs = randomIntBetween(1, 10);
final int numNodes = randomIntBetween(1, 20);
String[][] validAttrs = new String[numAttrs][2];
for (int i = 0; i < numAttrs; i++) {
validAttrs[i] = new String[] { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20) };
@ -129,12 +135,11 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attr[0], attr[1]);
}
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Set<String> validNodeIds = new HashSet<>();
Settings validNodeSettings = Settings.EMPTY;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -150,18 +155,18 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
}
public void testPerformActionAttrsSomeNodesValid() throws IOException {
final int numNodes = randomIntBetween(1, 20);
String[] validAttr = new String[] { "box_type", "valid" };
String[] invalidAttr = new String[] { "box_type", "not_valid" };
Settings.Builder indexSettings = settings(Version.CURRENT);
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + validAttr[0], validAttr[1]);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Set<String> validNodeIds = new HashSet<>();
Settings validNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + validAttr[0], validAttr[1]).build();
Settings invalidNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + invalidAttr[0], invalidAttr[1]).build();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -182,16 +187,16 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
}
public void testPerformActionAttrsNoNodesValid() {
final int numNodes = randomIntBetween(1, 20);
String[] validAttr = new String[] { "box_type", "valid" };
String[] invalidAttr = new String[] { "box_type", "not_valid" };
Settings.Builder indexSettings = settings(Version.CURRENT);
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + validAttr[0], validAttr[1]);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Settings invalidNodeSettings = Settings.builder().put(Node.NODE_ATTRIBUTES.getKey() + invalidAttr[0], invalidAttr[1]).build();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -205,6 +210,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
}
public void testPerformActionAttrsRequestFails() {
final int numNodes = randomIntBetween(1, 20);
int numAttrs = randomIntBetween(1, 10);
Map<String, String> validAttributes = new HashMap<>();
for (int i = 0; i < numAttrs; i++) {
@ -217,12 +223,11 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
});
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Set<String> validNodeIds = new HashSet<>();
Settings validNodeSettings = Settings.EMPTY;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -289,6 +294,7 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
public void testPerformActionAttrsNoShard() {
int numAttrs = randomIntBetween(1, 10);
final int numNodes = randomIntBetween(1, 20);
String[][] validAttrs = new String[numAttrs][2];
for (int i = 0; i < numAttrs; i++) {
validAttrs[i] = new String[] { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20) };
@ -298,12 +304,10 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attr[0], attr[1]);
}
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, numNodes - 1)).build();
Index index = indexMetaData.getIndex();
Set<String> validNodeIds = new HashSet<>();
Settings validNodeSettings = Settings.EMPTY;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
int numNodes = randomIntBetween(1, 20);
for (int i = 0; i < numNodes; i++) {
String nodeId = "node_id_" + i;
String nodeName = "node_" + i;
@ -312,7 +316,6 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
Settings nodeSettings = Settings.builder().put(validNodeSettings).put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(Node.NODE_ATTRIBUTES.getKey() + nodeAttr[0], nodeAttr[1]).build();
nodes.add(DiscoveryNode.createLocal(nodeSettings, new TransportAddress(TransportAddress.META_ADDRESS, nodePort), nodeId));
validNodeIds.add(nodeId);
}
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
@ -344,14 +347,193 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
Mockito.verifyZeroInteractions(client);
}
public void testPerformActionSomeShardsOnlyOnNewNodes() {
final Version oldVersion = VersionUtils.getPreviousVersion(Version.CURRENT);
final int numNodes = randomIntBetween(2, 20); // Need at least 2 nodes to have some nodes on a new version
final int numNewNodes = randomIntBetween(1, numNodes - 1);
final int numOldNodes = numNodes - numNewNodes;
final int numberOfShards = randomIntBetween(1, 5);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(settings(oldVersion))
.numberOfShards(numberOfShards).numberOfReplicas(randomIntBetween(0, numNewNodes - 1)).build();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Set<String> newNodeIds = new HashSet<>();
for (int i = 0; i < numNewNodes; i++) {
String nodeId = "new_node_id_" + i;
String nodeName = "new_node_" + i;
int nodePort = 9300 + i;
Settings nodeSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build();
newNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
Version.CURRENT));
}
Set<String> oldNodeIds = new HashSet<>();
for (int i = 0; i < numOldNodes; i++) {
String nodeId = "old_node_id_" + i;
String nodeName = "old_node_" + i;
int nodePort = 9300 + numNewNodes + i;
Settings nodeSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build();
oldNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
oldVersion));
}
Set<String> nodeIds = new HashSet<>();
nodeIds.addAll(newNodeIds);
nodeIds.addAll(oldNodeIds);
DiscoveryNodes discoveryNodes = nodes.build();
IndexRoutingTable.Builder indexRoutingTable = createRoutingTableWithOneShardOnSubset(indexMetaData, newNodeIds, nodeIds);
// Since one shard is already on only new nodes, we should always pick a new node
assertNodeSelected(indexMetaData, indexMetaData.getIndex(), newNodeIds, discoveryNodes, indexRoutingTable.build());
}
public void testPerformActionSomeShardsOnlyOnNewNodesButNewNodesInvalidAttrs() {
final Version oldVersion = VersionUtils.getPreviousVersion(Version.CURRENT);
final int numNodes = randomIntBetween(2, 20); // Need at least 2 nodes to have some nodes on a new version
final int numNewNodes = randomIntBetween(1, numNodes - 1);
final int numOldNodes = numNodes - numNewNodes;
final int numberOfShards = randomIntBetween(1, 5);
final String attribute = "box_type";
final String validAttr = "valid";
final String invalidAttr = "not_valid";
Settings.Builder indexSettings = settings(oldVersion);
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attribute, validAttr);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(numberOfShards).numberOfReplicas(randomIntBetween(0, numNewNodes - 1)).build();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Set<String> newNodeIds = new HashSet<>();
for (int i = 0; i < numNewNodes; i++) {
String nodeId = "new_node_id_" + i;
String nodeName = "new_node_" + i;
int nodePort = 9300 + i;
Settings nodeSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(Node.NODE_ATTRIBUTES.getKey() + attribute, invalidAttr).build();
newNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
Version.CURRENT));
}
Set<String> oldNodeIds = new HashSet<>();
for (int i = 0; i < numOldNodes; i++) {
String nodeId = "old_node_id_" + i;
String nodeName = "old_node_" + i;
int nodePort = 9300 + numNewNodes + i;
Settings nodeSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(Node.NODE_ATTRIBUTES.getKey() + attribute, validAttr).build();
oldNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
oldVersion));
}
Set<String> nodeIds = new HashSet<>();
nodeIds.addAll(newNodeIds);
nodeIds.addAll(oldNodeIds);
DiscoveryNodes discoveryNodes = nodes.build();
IndexRoutingTable.Builder indexRoutingTable = createRoutingTableWithOneShardOnSubset(indexMetaData, newNodeIds, nodeIds);
assertNoValidNode(indexMetaData, indexMetaData.getIndex(), discoveryNodes, indexRoutingTable.build());
}
public void testPerformActionNewShardsExistButWithInvalidAttributes() {
final Version oldVersion = VersionUtils.getPreviousVersion(Version.CURRENT);
final int numNodes = randomIntBetween(2, 20); // Need at least 2 nodes to have some nodes on a new version
final int numNewNodes = randomIntBetween(1, numNodes - 1);
final int numOldNodes = numNodes - numNewNodes;
final int numberOfShards = randomIntBetween(1, 5);
final String attribute = "box_type";
final String validAttr = "valid";
final String invalidAttr = "not_valid";
Settings.Builder indexSettings = settings(oldVersion);
indexSettings.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + attribute, validAttr);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10)).settings(indexSettings)
.numberOfShards(numberOfShards).numberOfReplicas(randomIntBetween(0, numOldNodes - 1)).build();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Set<String> newNodeIds = new HashSet<>();
for (int i = 0; i < numNewNodes; i++) {
String nodeId = "new_node_id_" + i;
String nodeName = "new_node_" + i;
int nodePort = 9300 + i;
Settings nodeSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(Node.NODE_ATTRIBUTES.getKey() + attribute, invalidAttr).build();
newNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
Version.CURRENT));
}
Set<String> oldNodeIds = new HashSet<>();
for (int i = 0; i < numOldNodes; i++) {
String nodeId = "old_node_id_" + i;
String nodeName = "old_node_" + i;
int nodePort = 9300 + numNewNodes + i;
Settings nodeSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
.put(Node.NODE_ATTRIBUTES.getKey() + attribute, validAttr).build();
oldNodeIds.add(nodeId);
nodes.add(new DiscoveryNode(
Node.NODE_NAME_SETTING.get(nodeSettings),
nodeId,
new TransportAddress(TransportAddress.META_ADDRESS, nodePort),
Node.NODE_ATTRIBUTES.getAsMap(nodeSettings),
DiscoveryNode.getRolesFromSettings(nodeSettings),
oldVersion));
}
Set<String> nodeIds = new HashSet<>();
nodeIds.addAll(newNodeIds);
nodeIds.addAll(oldNodeIds);
DiscoveryNodes discoveryNodes = nodes.build();
IndexRoutingTable.Builder indexRoutingTable = createRoutingTableWithOneShardOnSubset(indexMetaData, oldNodeIds, oldNodeIds);
assertNodeSelected(indexMetaData, indexMetaData.getIndex(), oldNodeIds, discoveryNodes, indexRoutingTable.build());
}
private void assertNodeSelected(IndexMetaData indexMetaData, Index index,
Set<String> validNodeIds, DiscoveryNodes.Builder nodes) throws IOException {
DiscoveryNodes discoveryNodes = nodes.build();
IndexRoutingTable.Builder indexRoutingTable = createRoutingTable(indexMetaData, index, discoveryNodes);
assertNodeSelected(indexMetaData, index, validNodeIds, discoveryNodes, indexRoutingTable.build());
}
private void assertNodeSelected(IndexMetaData indexMetaData, Index index, Set<String> validNodeIds, DiscoveryNodes nodes,
IndexRoutingTable indexRoutingTable) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetaData);
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node_id_0", true, ShardRoutingState.STARTED));
indexMetaData);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
SetSingleNodeAllocateStep step = createRandomInstance();
@ -369,8 +551,8 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[1];
assertSettingsRequestContainsValueFrom(request,
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", validNodeIds, true,
indexMetaData.getIndex().getName());
IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", validNodeIds, true,
indexMetaData.getIndex().getName());
listener.onResponse(new AcknowledgedResponse(true));
return null;
}
@ -400,12 +582,18 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
}
private void assertNoValidNode(IndexMetaData indexMetaData, Index index, DiscoveryNodes.Builder nodes) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder().fPut(index.getName(),
indexMetaData);
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index)
.addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node_id_0", true, ShardRoutingState.STARTED));
DiscoveryNodes discoveryNodes = nodes.build();
IndexRoutingTable.Builder indexRoutingTable = createRoutingTable(indexMetaData, index, discoveryNodes);
assertNoValidNode(indexMetaData, index, discoveryNodes, indexRoutingTable.build());
}
private void assertNoValidNode(IndexMetaData indexMetaData, Index index, DiscoveryNodes nodes, IndexRoutingTable indexRoutingTable) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData>builder().fPut(index.getName(),
indexMetaData);
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build()))
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
.nodes(nodes).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
SetSingleNodeAllocateStep step = createRandomInstance();
@ -429,4 +617,57 @@ public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSing
Mockito.verifyZeroInteractions(client);
}
private IndexRoutingTable.Builder createRoutingTable(IndexMetaData indexMetaData, Index index, DiscoveryNodes discoveryNodes) {
assertThat(indexMetaData.getNumberOfReplicas(), lessThanOrEqualTo(discoveryNodes.getSize() - 1));
List<String> nodeIds = new ArrayList<>();
for (DiscoveryNode node : discoveryNodes) {
nodeIds.add(node.getId());
}
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index);
for (int primary = 0; primary < indexMetaData.getNumberOfShards(); primary++) {
Set<String> nodesThisShardCanBePutOn = new HashSet<>(nodeIds);
String currentNode = randomFrom(nodesThisShardCanBePutOn);
nodesThisShardCanBePutOn.remove(currentNode);
indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(index, primary), currentNode,
true, ShardRoutingState.STARTED));
for (int replica = 0; replica < indexMetaData.getNumberOfReplicas(); replica++) {
assertThat("not enough nodes to allocate all initial shards", nodesThisShardCanBePutOn.size(), greaterThan(0));
String replicaNode = randomFrom(nodesThisShardCanBePutOn);
nodesThisShardCanBePutOn.remove(replicaNode);
indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(index, primary), replicaNode,
false, ShardRoutingState.STARTED));
}
}
return indexRoutingTable;
}
private IndexRoutingTable.Builder createRoutingTableWithOneShardOnSubset(IndexMetaData indexMetaData, Set<String> subset,
Set<String> allNodeIds) {
IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
final int numberOfShards = indexMetaData.getNumberOfShards();
final int shardOnlyOnNewNodes = randomIntBetween(0, numberOfShards - 1);
for (int primary = 0; primary < indexMetaData.getNumberOfShards(); primary++) {
Set<String> nodesThisShardCanBePutOn;
if (primary == shardOnlyOnNewNodes) {
// This shard should only be allocated to new nodes
nodesThisShardCanBePutOn = new HashSet<>(subset);
} else {
nodesThisShardCanBePutOn = new HashSet<>(allNodeIds);
}
String currentNode = randomFrom(nodesThisShardCanBePutOn);
nodesThisShardCanBePutOn.remove(currentNode);
indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(indexMetaData.getIndex(), primary), currentNode,
true, ShardRoutingState.STARTED));
for (int replica = 0; replica < indexMetaData.getNumberOfReplicas(); replica++) {
assertThat("not enough nodes to allocate all initial shards", nodesThisShardCanBePutOn.size(), greaterThan(0));
String replicaNode = randomFrom(nodesThisShardCanBePutOn);
nodesThisShardCanBePutOn.remove(replicaNode);
indexRoutingTable.addShard(TestShardRouting.newShardRouting(new ShardId(indexMetaData.getIndex(), primary), replicaNode,
false, ShardRoutingState.STARTED));
}
}
return indexRoutingTable;
}
}