[CLIENT] Unknown node version should be a lower bound

Today when we start a `TransportClient` we use the given transport
addresses and create a `DiscoveryNode` from it without knowing the
actual nodes version. We just use the `Version.CURRENT` which is an
upper bound. Yet, the other node might be a version less than the
currently running and serialisation of the nodes info might break. We
should rather use a lower bound here which is the version of the first
release with the same major version as `Version.CURRENT` since this is
what we officially support.

This commit moves to use the minimum major version or an RC / Snapshot
if the current version is a snapshot.

Closes #6894
This commit is contained in:
Simon Willnauer 2014-07-17 14:38:56 +02:00
parent f22f3db30f
commit d79717c341
5 changed files with 74 additions and 6 deletions

View File

@ -453,6 +453,17 @@ public class Version implements Serializable {
return version.id >= id; return version.id >= id;
} }
/**
* Returns the minimum compatible version based on the current
* version. Ie a node needs to have at least the return version in order
* to communicate with a node running the current version. The returned version
* is in most of the cases the smallest major version release unless the current version
* is a beta or RC release then the version itself is returned.
*/
public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(major * 1000000 + 99));
}
/** /**
* Just the version number (without -SNAPSHOT if snapshot). * Just the version number (without -SNAPSHOT if snapshot).
*/ */

View File

@ -196,6 +196,10 @@ public class TransportClient extends AbstractClient {
internalClient = injector.getInstance(InternalTransportClient.class); internalClient = injector.getInstance(InternalTransportClient.class);
} }
TransportClientNodesService nodeService() {
return nodesService;
}
/** /**
* Returns the current registered transport addresses to use (added using * Returns the current registered transport addresses to use (added using
* {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}. * {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}.

View File

@ -67,7 +67,7 @@ public class TransportClientNodesService extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Version version; private final Version minCompatibilityVersion;
// nodes that are added to be discovered // nodes that are added to be discovered
private volatile ImmutableList<DiscoveryNode> listedNodes = ImmutableList.of(); private volatile ImmutableList<DiscoveryNode> listedNodes = ImmutableList.of();
@ -95,7 +95,7 @@ public class TransportClientNodesService extends AbstractComponent {
this.clusterName = clusterName; this.clusterName = clusterName;
this.transportService = transportService; this.transportService = transportService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.version = version; this.minCompatibilityVersion = version.minimumCompatibilityVersion();
this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5)); this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5));
this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis(); this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis();
@ -161,7 +161,7 @@ public class TransportClientNodesService extends AbstractComponent {
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder(); ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
builder.addAll(listedNodes()); builder.addAll(listedNodes());
for (TransportAddress transportAddress : filtered) { for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, version); DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
logger.debug("adding address [{}]", node); logger.debug("adding address [{}]", node);
builder.add(node); builder.add(node);
} }

View File

@ -104,4 +104,14 @@ public class VersionTests extends ElasticsearchTestCase {
final Version version = randomFrom(Version.V_0_18_0, Version.V_0_90_13, Version.V_1_3_0); final Version version = randomFrom(Version.V_0_18_0, Version.V_0_90_13, Version.V_1_3_0);
assertEquals(version, Version.indexCreated(ImmutableSettings.builder().put(IndexMetaData.SETTING_UUID, "foo").put(IndexMetaData.SETTING_VERSION_CREATED, version).build())); assertEquals(version, Version.indexCreated(ImmutableSettings.builder().put(IndexMetaData.SETTING_UUID, "foo").put(IndexMetaData.SETTING_VERSION_CREATED, version).build()));
} }
@Test
public void testMinCompatVersion() {
assertThat(Version.V_2_0_0.minimumCompatibilityVersion(), equalTo(Version.V_2_0_0));
assertThat(Version.V_1_3_0.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0));
assertThat(Version.V_1_2_0.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0));
assertThat(Version.V_1_2_3.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0));
assertThat(Version.V_1_0_0_RC2.minimumCompatibilityVersion(), equalTo(Version.V_1_0_0_RC2));
}
} }

View File

@ -19,13 +19,22 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.hamcrest.Matchers; import org.elasticsearch.transport.TransportService;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0)
public class TransportClientTests extends ElasticsearchIntegrationTest { public class TransportClientTests extends ElasticsearchIntegrationTest {
@ -35,7 +44,41 @@ public class TransportClientTests extends ElasticsearchIntegrationTest {
String nodeName = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false)); String nodeName = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false));
TransportClient client = (TransportClient) internalCluster().client(nodeName); TransportClient client = (TransportClient) internalCluster().client(nodeName);
assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false)); assertThat(client.connectedNodes().get(0).dataNode(), equalTo(false));
} }
@Test
public void testNodeVersionIsUpdated() {
TransportClient client = (TransportClient) internalCluster().client();
TransportClientNodesService nodeService = client.nodeService();
Node node = NodeBuilder.nodeBuilder().data(false).settings(ImmutableSettings.builder()
.put(internalCluster().getDefaultSettings())
.put("http.enabled", false)
.put("index.store.type", "ram")
.put("config.ignore_system_properties", true) // make sure we get what we set :)
.put("gateway.type", "none")
.build()).clusterName("foobar").build();
node.start();
try {
TransportAddress transportAddress = ((InternalNode) node).injector().getInstance(TransportService.class).boundAddress().publishAddress();
client.addTransportAddress(transportAddress);
assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1)); // since we force transport clients there has to be one node started that we connect to.
for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) { // connected nodes have updated version
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT));
}
for (DiscoveryNode discoveryNode : nodeService.listedNodes()) {
assertThat(discoveryNode.id(), startsWith("#transport#-"));
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
assertThat(nodeService.filteredNodes().size(), equalTo(1));
for (DiscoveryNode discoveryNode : nodeService.filteredNodes()) {
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
} finally {
node.stop();
}
}
} }