diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/RefreshVersionInClusterStateIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/RefreshVersionInClusterStateIT.java new file mode 100644 index 00000000000..b6945047300 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/RefreshVersionInClusterStateIT.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.upgrades; + +import org.opensearch.Version; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.io.Streams; + +import java.io.IOException; + +public class RefreshVersionInClusterStateIT extends AbstractRollingTestCase { + + /* + This test ensures that after the upgrade from ElasticSearch/ OpenSearch all nodes report the version on and after 1.0.0 + */ + public void testRefresh() throws IOException { + switch (CLUSTER_TYPE) { + case OLD: + case MIXED: + break; + case UPGRADED: + Response response = client().performRequest(new Request("GET", "/_cat/nodes?h=id,version")); + for (String nodeLine : Streams.readAllLines(response.getEntity().getContent())) { + String[] elements = nodeLine.split(" +"); + assertEquals(Version.fromString(elements[1]), Version.CURRENT); + } + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index a89c52364b7..4d083f55c1c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -121,7 +121,8 @@ public class JoinHelper { this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, + transportService) { private final long term = currentTermSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 59409336945..ac725e613e0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.coordination; import org.apache.logging.log4j.Logger; +import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterState; @@ -48,6 +49,7 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.zen.ElectMasterService; import org.opensearch.persistent.PersistentTasksCustomMetadata; +import org.opensearch.transport.TransportService; import java.util.ArrayList; import java.util.Collection; @@ -67,6 +69,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor channelVersions = transportService.getChannelVersion(currentNodes); + for (DiscoveryNode node : currentNodes) { + if(channelVersions.containsKey(node.getId())) { + if (channelVersions.get(node.getId()) != node.getVersion()) { + DiscoveryNode tmpNode = nodesBuilder.get(node.getId()); + nodesBuilder.remove(node.getId()); + nodesBuilder.add(new DiscoveryNode(tmpNode.getName(), tmpNode.getId(), tmpNode.getEphemeralId(), + tmpNode.getHostName(), tmpNode.getHostAddress(), tmpNode.getAddress(), tmpNode.getAttributes(), + tmpNode.getRoles(), channelVersions.get(tmpNode.getId()))); + logger.info("Refreshed the DiscoveryNode version for node {}:{} from {} to {}", + node.getId(), node.getAddress(), node.getVersion(), channelVersions.get(tmpNode.getId())); + } + } else { + // in case existing OpenSearch node is present in the cluster and but there is no connection to that node yet, + // either that node will send new JoinRequest to the master with version >=1.0, then no issue or + // there is an edge case if doesn't send JoinRequest and connection is established, + // then it can continue to report version as 7.10.2 instead of actual OpenSearch version. So, + // removing the node from cluster state to prevent stale version reporting and let it reconnect. + if (node.getVersion().equals(LegacyESVersion.V_7_10_2)) { + nodesBuilder.remove(node.getId()); + } + } + } + } + } + @Override public boolean runOnlyOnMaster() { // we validate that we are allowed to change the cluster state during cluster state processing diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 96d940fa940..a1783f7961f 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -367,7 +367,7 @@ public class DiscoveryNode implements Writeable, ToXContentFragment { } } } - if (out.getVersion().before(Version.V_1_0_0)) { + if (out.getVersion().before(Version.V_1_0_0) && version.onOrAfter(Version.V_1_0_0)) { Version.writeVersion(LegacyESVersion.V_7_10_2, out); } else { Version.writeVersion(version, out); diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index c4f70c53eea..b7d9b2626f0 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -35,6 +35,7 @@ package org.opensearch.cluster.node; import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.Diff; @@ -226,6 +227,16 @@ public class DiscoveryNodes extends AbstractDiffable implements return existing != null && existing.equals(discoveryNode) && existing.getRoles().equals(discoveryNode.getRoles()); } + /** + * Determine if the given node exists and has the right version. During upgrade from Elasticsearch version as OpenSearch node run in + * BWC mode and can have the version as 7.10.2 in cluster state from older master to OpenSearch master. + */ + public boolean nodeExistsWithBWCVersion(DiscoveryNode discoveryNode) { + final DiscoveryNode existing = nodes.get(discoveryNode.getId()); + return existing != null && existing.equals(discoveryNode) && + existing.getVersion().equals(LegacyESVersion.V_7_10_2) && discoveryNode.getVersion().onOrAfter(Version.V_1_0_0); + } + /** * Get the id of the master node * diff --git a/server/src/main/java/org/opensearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/opensearch/discovery/zen/NodeJoinController.java index b576df87f66..1370d79ef55 100644 --- a/server/src/main/java/org/opensearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/opensearch/discovery/zen/NodeJoinController.java @@ -77,7 +77,7 @@ public class NodeJoinController { public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService, ElectMasterService electMaster, RerouteService rerouteService) { this.masterService = masterService; - joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService, null) { @Override public void clusterStatePublished(ClusterChangedEvent event) { electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index ce92872922e..a07e65092cc 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -44,6 +44,7 @@ import org.opensearch.client.Client; import org.opensearch.client.transport.TransportClient; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; import org.opensearch.common.Strings; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -74,6 +75,7 @@ import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -705,6 +707,18 @@ public class TransportService extends AbstractLifecycleComponent implements Repo } } + public Map getChannelVersion(DiscoveryNodes nodes) { + Map nodeChannelVersions = new HashMap<>(nodes.getSize()); + for (DiscoveryNode node: nodes) { + try { + nodeChannelVersions.putIfAbsent(node.getId(), connectionManager.getConnection(node).getVersion()); + } catch (Exception e) { + // ignore in case node is not connected + } + } + return nodeChannelVersions; + } + public final void sendChildRequest(final DiscoveryNode node, final String action, final TransportRequest request, final Task parentTask, final TransportRequestOptions options, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 4ed32e837bd..a7998d95a52 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -47,9 +47,13 @@ import org.opensearch.common.collect.List; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.transport.TransportService; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import static org.mockito.Matchers.anyBoolean; import static org.opensearch.test.VersionUtils.getPreviousVersion; import static org.opensearch.test.VersionUtils.incompatibleFutureVersion; import static org.opensearch.test.VersionUtils.maxCompatibleVersion; @@ -173,7 +177,7 @@ public class JoinTaskExecutorTests extends OpenSearchTestCase { when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); - final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService); + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null); final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); @@ -196,4 +200,61 @@ public class JoinTaskExecutorTests extends OpenSearchTestCase { assertThat(result.resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles())); } + + public void testUpdatesNodeWithOpenSearchVersionForExistingAndNewNodes() throws Exception { + // During the upgrade from Elasticsearch, OpenSearch node send their version as 7.10.2 to Elasticsearch master + // in order to successfully join the cluster. But as soon as OpenSearch node becomes the master, cluster state + // should show the OpenSearch nodes version as 1.x. As the cluster state was carry forwarded from ES master, + // version in DiscoveryNode is stale 7.10.2. + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then( + invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + Map channelVersions = new HashMap<>(); + String node_1 = UUIDs.base64UUID(); // OpenSearch node running BWC version + String node_2 = UUIDs.base64UUID(); // OpenSearch node running BWC version + String node_3 = UUIDs.base64UUID(); // OpenSearch node running BWC version, sending new join request and no active channel + String node_4 = UUIDs.base64UUID(); // ES node 7.10.2 + String node_5 = UUIDs.base64UUID(); // ES node 7.10.2 in cluster but missing channel version + String node_6 = UUIDs.base64UUID(); // ES node 7.9.0 + String node_7 = UUIDs.base64UUID(); // ES node 7.9.0 in cluster but missing channel version + channelVersions.put(node_1, LegacyESVersion.CURRENT); + channelVersions.put(node_2, LegacyESVersion.CURRENT); + channelVersions.put(node_4, LegacyESVersion.V_7_10_2); + channelVersions.put(node_6, LegacyESVersion.V_7_9_0); + + final TransportService transportService = mock(TransportService.class); + when(transportService.getChannelVersion(any())).thenReturn(channelVersions); + DiscoveryNodes.Builder nodes = new DiscoveryNodes.Builder().localNodeId(node_1); + nodes.add(new DiscoveryNode(node_1, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2)); + nodes.add(new DiscoveryNode(node_2, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2)); + nodes.add(new DiscoveryNode(node_3, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2)); + nodes.add(new DiscoveryNode(node_4, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2)); + nodes.add(new DiscoveryNode(node_5, buildNewFakeTransportAddress(), LegacyESVersion.V_7_10_2)); + nodes.add(new DiscoveryNode(node_6, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0)); + nodes.add(new DiscoveryNode(node_7, buildNewFakeTransportAddress(), LegacyESVersion.V_7_9_0)); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, + rerouteService, transportService); + final DiscoveryNode existing_node_3 = clusterState.nodes().get(node_3); + final DiscoveryNode node_3_new_join = new DiscoveryNode(existing_node_3.getName(), existing_node_3.getId(), + existing_node_3.getEphemeralId(), existing_node_3.getHostName(), existing_node_3.getHostAddress(), + existing_node_3.getAddress(), existing_node_3.getAttributes(), existing_node_3.getRoles(), Version.CURRENT); + + final ClusterStateTaskExecutor.ClusterTasksResult result + = joinTaskExecutor.execute(clusterState, List.of(new JoinTaskExecutor.Task(node_3_new_join, "test"), + JoinTaskExecutor.newBecomeMasterTask(), JoinTaskExecutor.newFinishElectionTask())); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + DiscoveryNodes resultNodes = result.resultingState.getNodes(); + assertEquals(resultNodes.get(node_1).getVersion(), Version.CURRENT); + assertEquals(resultNodes.get(node_2).getVersion(), Version.CURRENT); + assertEquals(resultNodes.get(node_3).getVersion(), Version.CURRENT); // 7.10.2 in old state but sent new join and processed + assertEquals(resultNodes.get(node_4).getVersion(), LegacyESVersion.V_7_10_2); + assertFalse(resultNodes.nodeExists(node_5)); // 7.10.2 node without active channel will be removed and should rejoin + assertEquals(resultNodes.get(node_6).getVersion(), LegacyESVersion.V_7_9_0); + // 7.9.0 node without active channel but shouldn't get removed + assertEquals(resultNodes.get(node_7).getVersion(), LegacyESVersion.V_7_9_0); + } } diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java index 7de5f45bf58..b3e0313a2a1 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodesTests.java @@ -420,4 +420,30 @@ public class DiscoveryNodesTests extends OpenSearchTestCase { assertEquals( Version.fromString("1.1.0"), build.getMaxNodeVersion()); assertEquals( LegacyESVersion.fromString("5.1.0"), build.getMinNodeVersion()); } + + public void testNodeExistsWithBWCVersion() { + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + DiscoveryNode node_1 = new DiscoveryNode("name_" + 1, "node_" + 1, buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)), + LegacyESVersion.fromString("7.10.2")); + DiscoveryNode node_2 = new DiscoveryNode("name_" + 2, "node_" + 2, buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)), + LegacyESVersion.fromString("7.9.0")); + discoBuilder.add(node_1); + discoBuilder.add(node_2); + + discoBuilder.localNodeId("name_1"); + discoBuilder.masterNodeId("name_2"); + DiscoveryNodes build = discoBuilder.build(); + assertTrue(build.nodeExistsWithBWCVersion(buildDiscoveryNodeFromExisting(node_1, Version.CURRENT))); + assertFalse(build.nodeExistsWithBWCVersion(buildDiscoveryNodeFromExisting(node_2, + LegacyESVersion.fromString("7.10.2")))); + assertFalse(build.nodeExistsWithBWCVersion(buildDiscoveryNodeFromExisting(node_1, + LegacyESVersion.fromString("6.8.0")))); + } + + private DiscoveryNode buildDiscoveryNodeFromExisting(DiscoveryNode existing, Version newVersion) { + return new DiscoveryNode(existing.getName(), existing.getId(), existing.getEphemeralId(), existing.getHostName(), + existing.getHostAddress(), existing.getAddress(), existing.getAttributes(), existing.getRoles(), newVersion); + } } diff --git a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java index a86ea853e5b..f49db009fe3 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/opensearch/indices/cluster/ClusterStateChanges.java @@ -250,7 +250,7 @@ public class ClusterStateChanges { transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}); + joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}, transportService); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {