AwsEc2UnicastHostsProvider should use version.minimumCompatibilityVersion()

The AwsEc2UnicastHostsProvider creates DiscoveryNodes that are used as an initial seed for unicast based discovery. At the moment it uses Version.CURRENT (see [1]) for those DiscoveryNode object, which confuses the backwards compatibility layer to think this nodes are of the latest version. This causes new nodes to fail to join old nodes as the ping serialization goes wrong. Instead we should use version.minimumCompatibilityVersion(). See [2]

[1] https://github.com/elasticsearch/elasticsearch-cloud-aws/blob/es-1.x/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java#L165
[2] https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java#L130

Ps. this was reported on the mailing list. See: https://groups.google.com/forum/#!msg/elasticsearch/8pUwFld88tI/7jRuG6hqtbAJ

Closes #143.
This commit is contained in:
David Pilato 2014-11-24 15:54:01 +01:00
parent 3f8a053da8
commit 66cedb350e
2 changed files with 9 additions and 4 deletions

View File

@ -59,6 +59,8 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final AmazonEC2 client; private final AmazonEC2 client;
private final Version version;
private final boolean bindAnyGroup; private final boolean bindAnyGroup;
private final ImmutableSet<String> groups; private final ImmutableSet<String> groups;
@ -70,10 +72,11 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final HostType hostType; private final HostType hostType;
@Inject @Inject
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AmazonEC2 client) { public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AmazonEC2 client, Version version) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
this.client = client; this.client = client;
this.version = version;
this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase()); this.hostType = HostType.valueOf(componentSettings.get("host_type", "private_ip").toUpperCase());
@ -162,7 +165,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
// we only limit to 1 addresses, makes no sense to ping 100 ports // we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) { for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]); logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], Version.CURRENT)); discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], version.minimumCompatibilityVersion()));
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address); logger.warn("failed ot add {}, address {}", e, instance.getInstanceId(), address);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery.ec2; package org.elasticsearch.discovery.ec2;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -46,7 +47,8 @@ public class Ec2Discovery extends ZenDiscovery {
public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService, ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService,
DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings, DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings,
ElectMasterService electMasterService, DynamicSettings dynamicSettings) { ElectMasterService electMasterService, DynamicSettings dynamicSettings,
Version version) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
discoveryNodeService, pingService, electMasterService, discoverySettings, dynamicSettings); discoveryNodeService, pingService, electMasterService, discoverySettings, dynamicSettings);
if (settings.getAsBoolean("cloud.enabled", true)) { if (settings.getAsBoolean("cloud.enabled", true)) {
@ -62,7 +64,7 @@ public class Ec2Discovery extends ZenDiscovery {
if (unicastZenPing != null) { if (unicastZenPing != null) {
// update the unicast zen ping to add cloud hosts provider // update the unicast zen ping to add cloud hosts provider
// and, while we are at it, use only it and not the multicast for example // and, while we are at it, use only it and not the multicast for example
unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, transportService, ec2Service.client())); unicastZenPing.addHostsProvider(new AwsEc2UnicastHostsProvider(settings, transportService, ec2Service.client(), version));
pingService.zenPings(ImmutableList.of(unicastZenPing)); pingService.zenPings(ImmutableList.of(unicastZenPing));
} else { } else {
logger.warn("failed to apply ec2 unicast discovery, no unicast ping found"); logger.warn("failed to apply ec2 unicast discovery, no unicast ping found");