Allow CCR on nodes with legacy roles only (#60093)

CCR will stop functioning if the master node is on 7.8, but data nodes 
are before that version because the master node considers that all data
nodes do not have the remote cluster client role. This commit allows CCR
work on data nodes with legacy roles only.

Relates #54146
Relates #59375
This commit is contained in:
Nhat Nguyen 2020-07-29 10:57:31 -04:00 committed by GitHub
parent d08e7633f8
commit 9d4a64e749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 28 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -172,6 +173,12 @@ public abstract class DiscoveryNodeRole implements Comparable<DiscoveryNodeRole>
public static SortedSet<DiscoveryNodeRole> BUILT_IN_ROLES = Collections.unmodifiableSortedSet(
new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE, REMOTE_CLUSTER_CLIENT_ROLE)));
/**
* The version that {@link #REMOTE_CLUSTER_CLIENT_ROLE} is introduced. Nodes before this version do not have that role even
* they can connect to remote clusters.
*/
public static final Version REMOTE_CLUSTER_CLIENT_ROLE_VERSION = Version.V_7_8_0;
static SortedSet<DiscoveryNodeRole> LEGACY_ROLES =
Collections.unmodifiableSortedSet(new TreeSet<>(Arrays.asList(DATA_ROLE, INGEST_ROLE, MASTER_ROLE)));

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
@ -124,14 +125,18 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
public Assignment getAssignment(final ShardFollowTask params, final ClusterState clusterState) {
final DiscoveryNode node = selectLeastLoadedNode(
clusterState,
DiscoveryNode selectedNode = selectLeastLoadedNode(clusterState,
((Predicate<DiscoveryNode>) DiscoveryNode::isDataNode).and(DiscoveryNode::isRemoteClusterClient)
);
if (node == null) {
if (selectedNode == null) {
// best effort as nodes before 7.8 might not be able to connect to remote clusters
selectedNode = selectLeastLoadedNode(clusterState,
node -> node.isDataNode() && node.getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
}
if (selectedNode == null) {
return NO_ASSIGNMENT;
} else {
return new Assignment(node.getId(), "node is the least loaded data node and remote cluster client");
return new Assignment(selectedNode.getId(), "node is the least loaded data node and remote cluster client");
}
}

View File

@ -57,11 +57,14 @@ public final class CcrPrimaryFollowerAllocationDecider extends AllocationDecider
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider");
}
if (node.node().isRemoteClusterClient() == false) {
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
if (node.node().isRemoteClusterClient()) {
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
return allocation.decision(Decision.YES, NAME,
"shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
if (node.node().getVersion().before(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION)) {
return allocation.decision(Decision.YES, NAME, "shard is a primary follower and node has only the legacy roles");
}
return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the "
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role");
}
}

View File

@ -18,8 +18,10 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;
@ -38,9 +40,13 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
public void testAssignmentToNodeWithDataAndRemoteClusterClientRoles() {
runAssignmentTest(
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)),
randomIntBetween(0, 8),
() -> new HashSet<>(randomSubsetOf(new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE)))),
newNode(
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE),
VersionUtils.randomVersion(random())),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
@ -56,11 +62,26 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
runNoAssignmentTest(Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
}
public void testNodeWithLegacyRolesOnly() {
final Version oldVersion = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
runAssignmentTest(
newNode(Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), oldVersion),
newNodes(
between(0, 8),
() -> Sets.newHashSet(randomSubsetOf(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE))),
Version.CURRENT),
(theSpecial, assignment) -> {
assertTrue(assignment.isAssigned());
assertThat(assignment.getExecutorNode(), equalTo(theSpecial.getId()));
}
);
}
private void runNoAssignmentTest(final Set<DiscoveryNodeRole> roles) {
runAssignmentTest(
roles,
0,
Collections::emptySet,
newNode(roles, Version.CURRENT),
Collections.emptySet(),
(theSpecial, assignment) -> {
assertFalse(assignment.isAssigned());
assertThat(assignment.getExplanation(), equalTo("no nodes found with data and remote cluster client roles"));
@ -69,9 +90,8 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
}
private void runAssignmentTest(
final Set<DiscoveryNodeRole> theSpecialRoles,
final int numberOfOtherNodes,
final Supplier<Set<DiscoveryNodeRole>> otherNodesRolesSupplier,
final DiscoveryNode targetNode,
final Set<DiscoveryNode> otherNodes,
final BiConsumer<DiscoveryNode, Assignment> consumer
) {
final ClusterService clusterService = mock(ClusterService.class);
@ -82,25 +102,30 @@ public class ShardFollowTasksExecutorAssignmentTests extends ESTestCase {
final ShardFollowTasksExecutor executor =
new ShardFollowTasksExecutor(mock(Client.class), mock(ThreadPool.class), clusterService, settingsModule);
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test"));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
final DiscoveryNode theSpecial = newNode(theSpecialRoles);
nodesBuilder.add(theSpecial);
for (int i = 0; i < numberOfOtherNodes; i++) {
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(targetNode);
for (DiscoveryNode node : otherNodes) {
nodesBuilder.add(node);
}
clusterStateBuilder.nodes(nodesBuilder);
final Assignment assignment = executor.getAssignment(mock(ShardFollowTask.class), clusterStateBuilder.build());
consumer.accept(theSpecial, assignment);
consumer.accept(targetNode, assignment);
}
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles) {
private static DiscoveryNode newNode(final Set<DiscoveryNodeRole> roles, final Version version) {
return new DiscoveryNode(
"node_" + UUIDs.randomBase64UUID(random()),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
roles,
Version.CURRENT
version
);
}
private static Set<DiscoveryNode> newNodes(int numberOfNodes, Supplier<Set<DiscoveryNodeRole>> rolesSupplier, Version version) {
Set<DiscoveryNode> nodes = new HashSet<>();
for (int i = 0; i < numberOfNodes; i++) {
nodes.add(newNode(rolesSupplier.get(), version));
}
return nodes;
}
}

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.util.ArrayList;
@ -60,6 +61,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;
@ -151,9 +153,10 @@ public class CcrPrimaryFollowerAllocationDeciderTests extends ESAllocationTestCa
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1).numberOfReplicas(1);
DiscoveryNode dataOnlyNode = newNode("d1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
DiscoveryNode dataAndRemoteNode = newNode("dr1",
final DiscoveryNode dataOnlyNode = newNode("data_role_only", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
final DiscoveryNode dataAndRemoteNode = newNode("data_and_remote_cluster_client_role",
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final DiscoveryNode nodeWithLegacyRolesOnly = newNodeWithLegacyRoles("legacy_roles_only");
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(dataOnlyNode).add(dataAndRemoteNode).build();
Metadata metadata = Metadata.builder().put(indexMetadata).build();
RoutingTable.Builder routingTable = RoutingTable.builder()
@ -171,6 +174,11 @@ public class CcrPrimaryFollowerAllocationDeciderTests extends ESAllocationTestCa
Decision yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), dataAndRemoteNode);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has the remote_cluster_client role"));
yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), nodeWithLegacyRolesOnly);
assertThat(yesDecision.type(), equalTo(Decision.Type.YES));
assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has only the legacy roles"));
for (ShardRouting replica : shardRouting.replicaShards()) {
assertThat(replica.state(), equalTo(UNASSIGNED));
yesDecision = executeAllocation(clusterState, replica, randomFrom(dataOnlyNode, dataAndRemoteNode));
@ -181,6 +189,12 @@ public class CcrPrimaryFollowerAllocationDeciderTests extends ESAllocationTestCa
}
}
static DiscoveryNode newNodeWithLegacyRoles(String id) {
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE_VERSION));
return new DiscoveryNode(id, buildNewFakeTransportAddress(), emptyMap(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), version);
}
static Decision executeAllocation(ClusterState clusterState, ShardRouting shardRouting, DiscoveryNode node) {
final AllocationDecider decider = new CcrPrimaryFollowerAllocationDecider();
final RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Collections.singletonList(decider)),