From d79717c3414d9400ca5627d637744967f0f215f8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 17 Jul 2014 14:38:56 +0200 Subject: [PATCH] [CLIENT] Unknown node version should be a lower bound Today when we start a `TransportClient` we use the given transport addresses and create a `DiscoveryNode` from it without knowing the actual nodes version. We just use the `Version.CURRENT` which is an upper bound. Yet, the other node might be a version less than the currently running and serialisation of the nodes info might break. We should rather use a lower bound here which is the version of the first release with the same major version as `Version.CURRENT` since this is what we officially support. This commit moves to use the minimum major version or an RC / Snapshot if the current version is a snapshot. Closes #6894 --- src/main/java/org/elasticsearch/Version.java | 11 +++++ .../client/transport/TransportClient.java | 4 ++ .../TransportClientNodesService.java | 6 +-- .../java/org/elasticsearch/VersionTests.java | 10 ++++ .../transport/TransportClientTests.java | 49 +++++++++++++++++-- 5 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/elasticsearch/Version.java b/src/main/java/org/elasticsearch/Version.java index 49b2ea10cfa..a1ffb6a2f49 100644 --- a/src/main/java/org/elasticsearch/Version.java +++ b/src/main/java/org/elasticsearch/Version.java @@ -453,6 +453,17 @@ public class Version implements Serializable { return version.id >= id; } + /** + * Returns the minimum compatible version based on the current + * version. Ie a node needs to have at least the return version in order + * to communicate with a node running the current version. The returned version + * is in most of the cases the smallest major version release unless the current version + * is a beta or RC release then the version itself is returned. + */ + public Version minimumCompatibilityVersion() { + return Version.smallest(this, fromId(major * 1000000 + 99)); + } + /** * Just the version number (without -SNAPSHOT if snapshot). */ diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 3013db50536..f3957f8b3b3 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -196,6 +196,10 @@ public class TransportClient extends AbstractClient { internalClient = injector.getInstance(InternalTransportClient.class); } + TransportClientNodesService nodeService() { + return nodesService; + } + /** * Returns the current registered transport addresses to use (added using * {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}. diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 3174d9ff42f..0fa28baa951 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -67,7 +67,7 @@ public class TransportClientNodesService extends AbstractComponent { private final ThreadPool threadPool; - private final Version version; + private final Version minCompatibilityVersion; // nodes that are added to be discovered private volatile ImmutableList listedNodes = ImmutableList.of(); @@ -95,7 +95,7 @@ public class TransportClientNodesService extends AbstractComponent { this.clusterName = clusterName; this.transportService = transportService; this.threadPool = threadPool; - this.version = version; + this.minCompatibilityVersion = version.minimumCompatibilityVersion(); this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5)); this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis(); @@ -161,7 +161,7 @@ public class TransportClientNodesService extends AbstractComponent { ImmutableList.Builder builder = ImmutableList.builder(); builder.addAll(listedNodes()); for (TransportAddress transportAddress : filtered) { - DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, version); + DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion); logger.debug("adding address [{}]", node); builder.add(node); } diff --git a/src/test/java/org/elasticsearch/VersionTests.java b/src/test/java/org/elasticsearch/VersionTests.java index cc6045f57ca..547ae3d9970 100644 --- a/src/test/java/org/elasticsearch/VersionTests.java +++ b/src/test/java/org/elasticsearch/VersionTests.java @@ -104,4 +104,14 @@ public class VersionTests extends ElasticsearchTestCase { final Version version = randomFrom(Version.V_0_18_0, Version.V_0_90_13, Version.V_1_3_0); assertEquals(version, Version.indexCreated(ImmutableSettings.builder().put(IndexMetaData.SETTING_UUID, "foo").put(IndexMetaData.SETTING_VERSION_CREATED, version).build())); } + + @Test + public void testMinCompatVersion() { + assertThat(Version.V_2_0_0.minimumCompatibilityVersion(), equalTo(Version.V_2_0_0)); + assertThat(Version.V_1_3_0.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0)); + assertThat(Version.V_1_2_0.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0)); + assertThat(Version.V_1_2_3.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0)); + assertThat(Version.V_1_0_0_RC2.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0_RC2)); + } + } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java index 3117888f94c..8f2f90fe03f 100644 --- a/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -19,13 +19,22 @@ package org.elasticsearch.client.transport; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.hamcrest.Matchers; +import org.elasticsearch.transport.TransportService; import org.junit.Test; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0) public class TransportClientTests extends ElasticsearchIntegrationTest { @@ -35,7 +44,41 @@ public class TransportClientTests extends ElasticsearchIntegrationTest { String nodeName = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false)); TransportClient client = (TransportClient) internalCluster().client(nodeName); - assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false)); + assertThat(client.connectedNodes().get(0).dataNode(), equalTo(false)); } + + @Test + public void testNodeVersionIsUpdated() { + TransportClient client = (TransportClient) internalCluster().client(); + TransportClientNodesService nodeService = client.nodeService(); + Node node = NodeBuilder.nodeBuilder().data(false).settings(ImmutableSettings.builder() + .put(internalCluster().getDefaultSettings()) + .put("http.enabled", false) + .put("index.store.type", "ram") + .put("config.ignore_system_properties", true) // make sure we get what we set :) + .put("gateway.type", "none") + .build()).clusterName("foobar").build(); + node.start(); + try { + TransportAddress transportAddress = ((InternalNode) node).injector().getInstance(TransportService.class).boundAddress().publishAddress(); + client.addTransportAddress(transportAddress); + assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1)); // since we force transport clients there has to be one node started that we connect to. + for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) { // connected nodes have updated version + assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT)); + } + + for (DiscoveryNode discoveryNode : nodeService.listedNodes()) { + assertThat(discoveryNode.id(), startsWith("#transport#-")); + assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion())); + } + + assertThat(nodeService.filteredNodes().size(), equalTo(1)); + for (DiscoveryNode discoveryNode : nodeService.filteredNodes()) { + assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion())); + } + } finally { + node.stop(); + } + } }