diff --git a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java index 2bc6cc4b130..482dafb008f 100644 --- a/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java +++ b/plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java @@ -24,12 +24,10 @@ import com.microsoft.windowsazure.management.compute.models.DeploymentStatus; import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse; import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint; import com.microsoft.windowsazure.management.compute.models.RoleInstance; -import org.elasticsearch.Version; import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException; import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException; import org.elasticsearch.cloud.azure.classic.management.AzureComputeService; import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.network.InetAddresses; @@ -47,9 +45,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; - public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { public enum HostType { @@ -104,7 +99,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic private final TimeValue refreshInterval; private long lastRefresh; - private List cachedDiscoNodes; + private List dynamicHosts; private final HostType hostType; private final String publicEndpointName; private final String deploymentName; @@ -137,30 +132,30 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic * Setting `cloud.azure.refresh_interval` to `0` will disable caching (default). */ @Override - public List buildDynamicNodes() { + public List buildDynamicHosts() { if (refreshInterval.millis() != 0) { - if (cachedDiscoNodes != null && + if (dynamicHosts != null && (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { logger.trace("using cache to retrieve node list"); - return cachedDiscoNodes; + return dynamicHosts; } lastRefresh = System.currentTimeMillis(); } logger.debug("start building nodes list using Azure API"); - cachedDiscoNodes = new ArrayList<>(); + dynamicHosts = new ArrayList<>(); HostedServiceGetDetailedResponse detailed; try { detailed = azureComputeService.getServiceDetails(); } catch (AzureServiceDisableException e) { logger.debug("Azure discovery service has been disabled. Returning empty list of nodes."); - return cachedDiscoNodes; + return dynamicHosts; } catch (AzureServiceRemoteException e) { // We got a remote exception logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage()); logger.trace("AzureServiceRemoteException caught", e); - return cachedDiscoNodes; + return dynamicHosts; } InetAddress ipAddress = null; @@ -212,8 +207,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1); for (TransportAddress address : addresses) { logger.trace("adding {}, transport_address {}", networkAddress, address); - cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(), - emptySet(), Version.CURRENT.minimumCompatibilityVersion())); + dynamicHosts.add(address); } } catch (Exception e) { logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage()); @@ -221,9 +215,9 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic } } - logger.debug("{} node(s) added", cachedDiscoNodes.size()); + logger.debug("{} addresses added", dynamicHosts.size()); - return cachedDiscoNodes; + return dynamicHosts; } protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) { diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java index 2c536981b04..396e9f707d4 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -29,8 +29,6 @@ import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.Tag; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -46,8 +44,6 @@ import java.util.Map; import java.util.Set; import static java.util.Collections.disjoint; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX; import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS; import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP; @@ -70,7 +66,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos private final String hostType; - private final DiscoNodesCache discoNodes; + private final TransportAddressesCache dynamicHosts; AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) { super(settings); @@ -78,7 +74,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos this.awsEc2Service = awsEc2Service; this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings); - this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings)); + this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings)); this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings); this.groups = new HashSet<>(); @@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos } @Override - public List buildDynamicNodes() { - return discoNodes.getOrRefresh(); + public List buildDynamicHosts() { + return dynamicHosts.getOrRefresh(); } - protected List fetchDynamicNodes() { + protected List fetchDynamicNodes() { - final List discoNodes = new ArrayList<>(); + final List dynamicHosts = new ArrayList<>(); final DescribeInstancesResult descInstances; try (AmazonEc2Reference clientReference = awsEc2Service.client()) { @@ -115,7 +111,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos } catch (final AmazonClientException e) { logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage()); logger.debug("Full exception:", e); - return discoNodes; + return dynamicHosts; } logger.trace("building dynamic unicast discovery nodes..."); @@ -179,8 +175,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos final TransportAddress[] addresses = transportService.addressesFromString(address, 1); for (int i = 0; i < addresses.length; i++) { logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]); - discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i, - addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion())); + dynamicHosts.add(addresses[i]); } } catch (final Exception e) { final String finalAddress = address; @@ -194,9 +189,9 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos } } - logger.debug("using dynamic discovery nodes {}", discoNodes); + logger.debug("using dynamic transport addresses {}", dynamicHosts); - return discoNodes; + return dynamicHosts; } private DescribeInstancesRequest buildDescribeInstancesRequest() { @@ -222,11 +217,11 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos return describeInstancesRequest; } - private final class DiscoNodesCache extends SingleObjectCache> { + private final class TransportAddressesCache extends SingleObjectCache> { private boolean empty = true; - protected DiscoNodesCache(TimeValue refreshInterval) { + protected TransportAddressesCache(TimeValue refreshInterval) { super(refreshInterval, new ArrayList<>()); } @@ -236,8 +231,8 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos } @Override - protected List refresh() { - final List nodes = fetchDynamicNodes(); + protected List refresh() { + final List nodes = fetchDynamicNodes(); empty = nodes.isEmpty(); return nodes; } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index 43cc924fadb..9dc2e02edc1 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.discovery.ec2; import com.amazonaws.services.ec2.model.Tag; import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -87,16 +86,16 @@ public class Ec2DiscoveryTests extends ESTestCase { null); } - protected List buildDynamicNodes(Settings nodeSettings, int nodes) { - return buildDynamicNodes(nodeSettings, nodes, null); + protected List buildDynamicHosts(Settings nodeSettings, int nodes) { + return buildDynamicHosts(nodeSettings, nodes, null); } - protected List buildDynamicNodes(Settings nodeSettings, int nodes, List> tagsList) { + protected List buildDynamicHosts(Settings nodeSettings, int nodes, List> tagsList) { try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) { AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service); - List discoveryNodes = provider.buildDynamicNodes(); - logger.debug("--> nodes found: {}", discoveryNodes); - return discoveryNodes; + List dynamicHosts = provider.buildDynamicHosts(); + logger.debug("--> addresses found: {}", dynamicHosts); + return dynamicHosts; } catch (IOException e) { fail("Unexpected IOException"); return null; @@ -107,7 +106,7 @@ public class Ec2DiscoveryTests extends ESTestCase { int nodes = randomInt(10); Settings nodeSettings = Settings.builder() .build(); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes); + List discoveryNodes = buildDynamicHosts(nodeSettings, nodes); assertThat(discoveryNodes, hasSize(nodes)); } @@ -119,12 +118,11 @@ public class Ec2DiscoveryTests extends ESTestCase { Settings nodeSettings = Settings.builder() .put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip") .build(); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes); - assertThat(discoveryNodes, hasSize(nodes)); + List transportAddresses = buildDynamicHosts(nodeSettings, nodes); + assertThat(transportAddresses, hasSize(nodes)); // We check that we are using here expected address int node = 1; - for (DiscoveryNode discoveryNode : discoveryNodes) { - TransportAddress address = discoveryNode.getAddress(); + for (TransportAddress address : transportAddresses) { TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++); assertEquals(address, expected); } @@ -138,12 +136,11 @@ public class Ec2DiscoveryTests extends ESTestCase { Settings nodeSettings = Settings.builder() .put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip") .build(); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes); - assertThat(discoveryNodes, hasSize(nodes)); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes); + assertThat(dynamicHosts, hasSize(nodes)); // We check that we are using here expected address int node = 1; - for (DiscoveryNode discoveryNode : discoveryNodes) { - TransportAddress address = discoveryNode.getAddress(); + for (TransportAddress address : dynamicHosts) { TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++); assertEquals(address, expected); } @@ -159,13 +156,12 @@ public class Ec2DiscoveryTests extends ESTestCase { Settings nodeSettings = Settings.builder() .put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns") .build(); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes); - assertThat(discoveryNodes, hasSize(nodes)); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes); + assertThat(dynamicHosts, hasSize(nodes)); // We check that we are using here expected address int node = 1; - for (DiscoveryNode discoveryNode : discoveryNodes) { + for (TransportAddress address : dynamicHosts) { String instanceId = "node" + node++; - TransportAddress address = discoveryNode.getAddress(); TransportAddress expected = poorMansDNS.get( AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS); assertEquals(address, expected); @@ -182,13 +178,12 @@ public class Ec2DiscoveryTests extends ESTestCase { Settings nodeSettings = Settings.builder() .put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns") .build(); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes); - assertThat(discoveryNodes, hasSize(nodes)); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes); + assertThat(dynamicHosts, hasSize(nodes)); // We check that we are using here expected address int node = 1; - for (DiscoveryNode discoveryNode : discoveryNodes) { + for (TransportAddress address : dynamicHosts) { String instanceId = "node" + node++; - TransportAddress address = discoveryNode.getAddress(); TransportAddress expected = poorMansDNS.get( AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS); assertEquals(address, expected); @@ -201,7 +196,7 @@ public class Ec2DiscoveryTests extends ESTestCase { .build(); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { - buildDynamicNodes(nodeSettings, 1); + buildDynamicHosts(nodeSettings, 1); }); assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type")); } @@ -227,8 +222,8 @@ public class Ec2DiscoveryTests extends ESTestCase { } logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList); - assertThat(discoveryNodes, hasSize(prodInstances)); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList); + assertThat(dynamicHosts, hasSize(prodInstances)); } public void testFilterByMultipleTags() throws InterruptedException { @@ -258,8 +253,8 @@ public class Ec2DiscoveryTests extends ESTestCase { } logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList); - assertThat(discoveryNodes, hasSize(prodInstances)); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList); + assertThat(dynamicHosts, hasSize(prodInstances)); } public void testReadHostFromTag() throws InterruptedException, UnknownHostException { @@ -285,11 +280,11 @@ public class Ec2DiscoveryTests extends ESTestCase { } logger.info("started [{}] instances", nodes); - List discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList); - assertThat(discoveryNodes, hasSize(nodes)); - for (DiscoveryNode discoveryNode : discoveryNodes) { - TransportAddress address = discoveryNode.getAddress(); - TransportAddress expected = poorMansDNS.get(discoveryNode.getName()); + List dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList); + assertThat(dynamicHosts, hasSize(nodes)); + int node = 1; + for (TransportAddress address : dynamicHosts) { + TransportAddress expected = poorMansDNS.get("node" + node++); assertEquals(address, expected); } } @@ -306,13 +301,13 @@ public class Ec2DiscoveryTests extends ESTestCase { AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null); DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) { @Override - protected List fetchDynamicNodes() { + protected List fetchDynamicNodes() { fetchCount++; return new ArrayList<>(); } }; for (int i=0; i<3; i++) { - provider.buildDynamicNodes(); + provider.buildDynamicHosts(); } assertThat(provider.fetchCount, is(3)); } @@ -323,18 +318,18 @@ public class Ec2DiscoveryTests extends ESTestCase { try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) { DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) { @Override - protected List fetchDynamicNodes() { + protected List fetchDynamicNodes() { fetchCount++; - return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1); + return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1); } }; for (int i=0; i<3; i++) { - provider.buildDynamicNodes(); + provider.buildDynamicHosts(); } assertThat(provider.fetchCount, is(1)); Thread.sleep(1_000L); // wait for cache to expire for (int i=0; i<3; i++) { - provider.buildDynamicNodes(); + provider.buildDynamicHosts(); } assertThat(provider.fetchCount, is(2)); } diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index 1029f907a66..7abcb445472 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -21,8 +21,8 @@ package org.elasticsearch.discovery.file; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; @@ -58,7 +58,6 @@ import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists; class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt"; - static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_"; private final TransportService transportService; private final ExecutorService executorService; @@ -76,7 +75,7 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast } @Override - public List buildDynamicNodes() { + public List buildDynamicHosts() { List hostsList; try (Stream lines = Files.lines(unicastHostsFilePath)) { hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments @@ -91,23 +90,22 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast hostsList = Collections.emptyList(); } - final List discoNodes = new ArrayList<>(); + final List dynamicHosts = new ArrayList<>(); try { - discoNodes.addAll(resolveHostsLists( + dynamicHosts.addAll(resolveHostsLists( executorService, logger, hostsList, 1, transportService, - UNICAST_HOST_PREFIX, resolveTimeout)); } catch (InterruptedException e) { throw new RuntimeException(e); } - logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes); + logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts); - return discoNodes; + return dynamicHosts; } } diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index 3ddd15a7b4c..860d3537635 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.file; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -50,7 +49,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; -import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX; /** * Tests for {@link FileBasedUnicastHostsProvider}. @@ -104,23 +102,20 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { public void testBuildDynamicNodes() throws Exception { final List hostEntries = Arrays.asList("#comment, should be ignored", "192.168.0.1", "192.168.0.2:9305", "255.255.23.15"); - final List nodes = setupAndRunHostProvider(hostEntries); + final List nodes = setupAndRunHostProvider(hostEntries); assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment - assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress()); - assertEquals(9300, nodes.get(0).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "192.168.0.1_0#", nodes.get(0).getId()); - assertEquals("192.168.0.2", nodes.get(1).getAddress().getAddress()); - assertEquals(9305, nodes.get(1).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "192.168.0.2:9305_0#", nodes.get(1).getId()); - assertEquals("255.255.23.15", nodes.get(2).getAddress().getAddress()); - assertEquals(9300, nodes.get(2).getAddress().getPort()); - assertEquals(UNICAST_HOST_PREFIX + "255.255.23.15_0#", nodes.get(2).getId()); + assertEquals("192.168.0.1", nodes.get(0).getAddress()); + assertEquals(9300, nodes.get(0).getPort()); + assertEquals("192.168.0.2", nodes.get(1).getAddress()); + assertEquals(9305, nodes.get(1).getPort()); + assertEquals("255.255.23.15", nodes.get(2).getAddress()); + assertEquals(9300, nodes.get(2).getPort()); } public void testEmptyUnicastHostsFile() throws Exception { final List hostEntries = Collections.emptyList(); - final List nodes = setupAndRunHostProvider(hostEntries); - assertEquals(0, nodes.size()); + final List addresses = setupAndRunHostProvider(hostEntries); + assertEquals(0, addresses.size()); } public void testUnicastHostsDoesNotExist() throws Exception { @@ -129,27 +124,27 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { .build(); final Environment environment = TestEnvironment.newEnvironment(settings); final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService); - final List nodes = provider.buildDynamicNodes(); - assertEquals(0, nodes.size()); + final List addresses = provider.buildDynamicHosts(); + assertEquals(0, addresses.size()); } public void testInvalidHostEntries() throws Exception { List hostEntries = Arrays.asList("192.168.0.1:9300:9300"); - List nodes = setupAndRunHostProvider(hostEntries); - assertEquals(0, nodes.size()); + List addresses = setupAndRunHostProvider(hostEntries); + assertEquals(0, addresses.size()); } public void testSomeInvalidHostEntries() throws Exception { List hostEntries = Arrays.asList("192.168.0.1:9300:9300", "192.168.0.1:9301"); - List nodes = setupAndRunHostProvider(hostEntries); - assertEquals(1, nodes.size()); // only one of the two is valid and will be used - assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress()); - assertEquals(9301, nodes.get(0).getAddress().getPort()); + List addresses = setupAndRunHostProvider(hostEntries); + assertEquals(1, addresses.size()); // only one of the two is valid and will be used + assertEquals("192.168.0.1", addresses.get(0).getAddress()); + assertEquals(9301, addresses.get(0).getPort()); } // sets up the config dir, writes to the unicast hosts file in the config dir, // and then runs the file-based unicast host provider to get the list of discovery nodes - private List setupAndRunHostProvider(final List hostEntries) throws IOException { + private List setupAndRunHostProvider(final List hostEntries) throws IOException { final Path homeDir = createTempDir(); final Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), homeDir) @@ -168,6 +163,6 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { } return new FileBasedUnicastHostsProvider( - new Environment(settings, configPath), transportService, executorService).buildDynamicNodes(); + new Environment(settings, configPath), transportService, executorService).buildDynamicHosts(); } } diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java index de290245895..790d70a8b99 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java @@ -31,9 +31,7 @@ import com.google.api.services.compute.model.Instance; import com.google.api.services.compute.model.NetworkInterface; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.Version; import org.elasticsearch.cloud.gce.GceInstancesService; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.network.NetworkAddress; @@ -47,8 +45,6 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.transport.TransportService; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; public class GceUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { @@ -72,7 +68,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas private final TimeValue refreshInterval; private long lastRefresh; - private List cachedDiscoNodes; + private List cachedDynamicHosts; public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstancesService, TransportService transportService, @@ -97,7 +93,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas * Information can be cached using `cloud.gce.refresh_interval` property if needed. */ @Override - public List buildDynamicNodes() { + public List buildDynamicHosts() { // We check that needed properties have been set if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) { throw new IllegalArgumentException("one or more gce discovery settings are missing. " + @@ -106,16 +102,16 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas } if (refreshInterval.millis() != 0) { - if (cachedDiscoNodes != null && + if (cachedDynamicHosts != null && (refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) { if (logger.isTraceEnabled()) logger.trace("using cache to retrieve node list"); - return cachedDiscoNodes; + return cachedDynamicHosts; } lastRefresh = System.currentTimeMillis(); } logger.debug("start building nodes list using GCE API"); - cachedDiscoNodes = new ArrayList<>(); + cachedDynamicHosts = new ArrayList<>(); String ipAddress = null; try { InetAddress inetAddress = networkService.resolvePublishHostAddresses( @@ -133,7 +129,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas if (instances == null) { logger.trace("no instance found for project [{}], zones [{}].", this.project, this.zones); - return cachedDiscoNodes; + return cachedDynamicHosts; } for (Instance instance : instances) { @@ -238,8 +234,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas for (TransportAddress transportAddress : addresses) { logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type, ip_private, transportAddress, status); - cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress, - emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion())); + cachedDynamicHosts.add(transportAddress); } } } catch (Exception e) { @@ -252,9 +247,9 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas logger.warn("exception caught during discovery", e); } - logger.debug("{} node(s) added", cachedDiscoNodes.size()); - logger.debug("using dynamic discovery nodes {}", cachedDiscoNodes); + logger.debug("{} addresses added", cachedDynamicHosts.size()); + logger.debug("using transport addresses {}", cachedDynamicHosts); - return cachedDiscoNodes; + return cachedDynamicHosts; } } diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java index 31ea9bdb1c2..a1944a15d80 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java @@ -21,9 +21,9 @@ package org.elasticsearch.discovery.gce; import org.elasticsearch.Version; import org.elasticsearch.cloud.gce.GceInstancesServiceImpl; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -40,7 +40,6 @@ import java.util.Locale; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; /** * This test class uses a GCE HTTP Mock system which allows to simulate JSON Responses. @@ -105,13 +104,13 @@ public class GceDiscoveryTests extends ESTestCase { } } - protected List buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) { + protected List buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) { GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService, transportService, new NetworkService(Collections.emptyList())); - List discoveryNodes = provider.buildDynamicNodes(); - logger.info("--> nodes found: {}", discoveryNodes); - return discoveryNodes; + List dynamicHosts = provider.buildDynamicHosts(); + logger.info("--> addresses found: {}", dynamicHosts); + return dynamicHosts; } public void testNodesWithDifferentTagsAndNoTagSet() { @@ -120,8 +119,8 @@ public class GceDiscoveryTests extends ESTestCase { .put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } public void testNodesWithDifferentTagsAndOneTagSet() { @@ -131,9 +130,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(1)); - assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0")); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(1)); } public void testNodesWithDifferentTagsAndTwoTagSet() { @@ -143,9 +141,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(1)); - assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0")); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(1)); } public void testNodesWithSameTagsAndNoTagSet() { @@ -154,8 +151,8 @@ public class GceDiscoveryTests extends ESTestCase { .put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } public void testNodesWithSameTagsAndOneTagSet() { @@ -165,8 +162,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } public void testNodesWithSameTagsAndTwoTagsSet() { @@ -176,8 +173,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } public void testMultipleZonesAndTwoNodesInSameZone() { @@ -186,8 +183,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } public void testMultipleZonesAndTwoNodesInDifferentZones() { @@ -196,8 +193,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(2)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(2)); } /** @@ -209,8 +206,8 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "us-central1-b") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(0)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(0)); } public void testIllegalSettingsMissingAllRequired() { @@ -261,7 +258,7 @@ public class GceDiscoveryTests extends ESTestCase { .putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b", "us-central1-a") .build(); mock = new GceInstancesServiceMock(nodeSettings); - List discoveryNodes = buildDynamicNodes(mock, nodeSettings); - assertThat(discoveryNodes, hasSize(1)); + List dynamicHosts = buildDynamicNodes(mock, nodeSettings); + assertThat(dynamicHosts, hasSize(1)); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java index 9ff3215cd64..d719f9d123b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastHostsProvider.java @@ -19,7 +19,7 @@ package org.elasticsearch.discovery.zen; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.TransportAddress; import java.util.List; @@ -31,5 +31,5 @@ public interface UnicastHostsProvider { /** * Builds the dynamic list of unicast hosts to be used for unicast discovery. */ - List buildDynamicNodes(); + List buildDynamicHosts(); } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index e9ac1deec0a..cbadbb4a1e0 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -118,9 +118,6 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger(); - // used as a node id prefix for configured unicast host nodes/address - private static final String UNICAST_NODE_PREFIX = "#zen_unicast_"; - private final Map activePingingRounds = newConcurrentMap(); // a list of temporal responses a node will return for a request (holds responses from other nodes) @@ -184,23 +181,20 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { * @param hosts the hosts to resolve * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service - * @param nodeId_prefix a prefix to use for node ids * @param resolveTimeout the timeout before returning from hostname lookups - * @return a list of discovery nodes with resolved transport addresses + * @return a list of resolved transport addresses */ - public static List resolveHostsLists( + public static List resolveHostsLists( final ExecutorService executorService, final Logger logger, final List hosts, final int limitPortCounts, final TransportService transportService, - final String nodeId_prefix, final TimeValue resolveTimeout) throws InterruptedException { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); - Objects.requireNonNull(nodeId_prefix); Objects.requireNonNull(resolveTimeout); if (resolveTimeout.nanos() < 0) { throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); @@ -213,7 +207,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { .collect(Collectors.toList()); final List> futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); - final List discoveryNodes = new ArrayList<>(); + final List transportAddresses = new ArrayList<>(); final Set localAddresses = new HashSet<>(); localAddresses.add(transportService.boundAddress().publishAddress()); localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); @@ -231,13 +225,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { final TransportAddress address = addresses[addressId]; // no point in pinging ourselves if (localAddresses.contains(address) == false) { - discoveryNodes.add( - new DiscoveryNode( - nodeId_prefix + hostname + "_" + addressId + "#", - address, - emptyMap(), - emptySet(), - Version.CURRENT.minimumCompatibilityVersion())); + transportAddresses.add(address); } } } catch (final ExecutionException e) { @@ -249,7 +237,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } - return discoveryNodes; + return Collections.unmodifiableList(transportAddresses); } @Override @@ -292,29 +280,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { protected void ping(final Consumer resultsConsumer, final TimeValue scheduleDuration, final TimeValue requestDuration) { - final List seedNodes; + final List seedAddresses = new ArrayList<>(); try { - seedNodes = resolveHostsLists( + seedAddresses.addAll(resolveHostsLists( unicastZenPingExecutorService, logger, configuredHosts, limitPortCounts, transportService, - UNICAST_NODE_PREFIX, - resolveTimeout); + resolveTimeout)); } catch (InterruptedException e) { throw new RuntimeException(e); } - seedNodes.addAll(hostsProvider.buildDynamicNodes()); + seedAddresses.addAll(hostsProvider.buildDynamicHosts()); final DiscoveryNodes nodes = contextProvider.clusterState().nodes(); // add all possible master nodes that were active in the last known cluster configuration for (ObjectCursor masterNode : nodes.getMasterNodes().values()) { - seedNodes.add(masterNode.value); + seedAddresses.add(masterNode.value.getAddress()); } final ConnectionProfile connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration); - final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer, + final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer, nodes.getLocalNode(), connectionProfile); activePingingRounds.put(pingingRound.id(), pingingRound); final AbstractRunnable pingSender = new AbstractRunnable() { @@ -356,17 +343,17 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final Map tempConnections = new HashMap<>(); private final KeyedLock connectionLock = new KeyedLock<>(true); private final PingCollection pingCollection; - private final List seedNodes; + private final List seedAddresses; private final Consumer pingListener; private final DiscoveryNode localNode; private final ConnectionProfile connectionProfile; private AtomicBoolean closed = new AtomicBoolean(false); - PingingRound(int id, List seedNodes, Consumer resultsConsumer, DiscoveryNode localNode, + PingingRound(int id, List seedAddresses, Consumer resultsConsumer, DiscoveryNode localNode, ConnectionProfile connectionProfile) { this.id = id; - this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); + this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList())); this.pingListener = resultsConsumer; this.localNode = localNode; this.connectionProfile = connectionProfile; @@ -381,9 +368,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { return this.closed.get(); } - public List getSeedNodes() { + public List getSeedAddresses() { ensureOpen(); - return seedNodes; + return seedAddresses; } public Connection getOrConnect(DiscoveryNode node) throws IOException { @@ -457,26 +444,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { final ClusterState lastState = contextProvider.clusterState(); final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState)); - Set nodesFromResponses = temporalResponses.stream().map(pingResponse -> { + List temporalAddresses = temporalResponses.stream().map(pingResponse -> { assert clusterName.equals(pingResponse.clusterName()) : "got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName(); - return pingResponse.node(); - }).collect(Collectors.toSet()); - - // dedup by address - final Map uniqueNodesByAddress = - Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream()) - .collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1)); + return pingResponse.node().getAddress(); + }).collect(Collectors.toList()); + final Stream uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(), + temporalAddresses.stream()).distinct(); // resolve what we can via the latest cluster state - final Set nodesToPing = uniqueNodesByAddress.values().stream() - .map(node -> { - DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress()); - if (foundNode == null) { - return node; - } else { + final Set nodesToPing = uniqueAddresses + .map(address -> { + DiscoveryNode foundNode = lastState.nodes().findByAddress(address); + if (foundNode != null && transportService.nodeConnected(foundNode)) { return foundNode; + } else { + return new DiscoveryNode( + address.toString(), + address, + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion()); } }).collect(Collectors.toSet()); diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index fdc36152cc8..33c87ea7f38 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -84,7 +84,7 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase { internalCluster().getInstance(TransportService.class); // try to ping the single node directly final UnicastHostsProvider provider = - () -> Collections.singletonList(nodeTransport.getLocalNode()); + () -> Collections.singletonList(nodeTransport.getLocalNode().getAddress()); final CountDownLatch latch = new CountDownLatch(1); final DiscoveryNodes nodes = DiscoveryNodes.builder() .add(nodeTransport.getLocalNode()) diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index f71ffe28b50..4aa75077431 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -408,19 +408,18 @@ public class UnicastZenPingTests extends ESTestCase { Collections.emptySet()); closeables.push(transportService); final int limitPortCounts = randomIntBetween(1, 10); - final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = TestUnicastZenPing.resolveHostsLists( executorService, logger, Collections.singletonList("127.0.0.1"), limitPortCounts, transportService, - "test_", TimeValue.timeValueSeconds(1)); - assertThat(discoveryNodes, hasSize(limitPortCounts)); + assertThat(transportAddresses, hasSize(limitPortCounts)); final Set ports = new HashSet<>(); - for (final DiscoveryNode discoveryNode : discoveryNodes) { - assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress()); - ports.add(discoveryNode.getAddress().getPort()); + for (final TransportAddress address : transportAddresses) { + assertTrue(address.address().getAddress().isLoopbackAddress()); + ports.add(address.getPort()); } assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet()))); } @@ -453,19 +452,18 @@ public class UnicastZenPingTests extends ESTestCase { new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); closeables.push(transportService); - final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = TestUnicastZenPing.resolveHostsLists( executorService, logger, Collections.singletonList(NetworkAddress.format(loopbackAddress)), 10, transportService, - "test_", TimeValue.timeValueSeconds(1)); - assertThat(discoveryNodes, hasSize(7)); + assertThat(transportAddresses, hasSize(7)); final Set ports = new HashSet<>(); - for (final DiscoveryNode discoveryNode : discoveryNodes) { - assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress()); - ports.add(discoveryNode.getAddress().getPort()); + for (final TransportAddress address : transportAddresses) { + assertTrue(address.address().getAddress().isLoopbackAddress()); + ports.add(address.getPort()); } assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet()))); } @@ -505,17 +503,16 @@ public class UnicastZenPingTests extends ESTestCase { Collections.emptySet()); closeables.push(transportService); - final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList(hostname), 1, transportService, - "test_", TimeValue.timeValueSeconds(1) ); - assertThat(discoveryNodes, empty()); + assertThat(transportAddresses, empty()); verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException); } @@ -565,16 +562,15 @@ public class UnicastZenPingTests extends ESTestCase { closeables.push(transportService); final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3)); try { - final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("hostname1", "hostname2"), 1, transportService, - "test+", resolveTimeout); - assertThat(discoveryNodes, hasSize(1)); + assertThat(transportAddresses, hasSize(1)); verify(logger).trace( "resolved host [{}] to {}", "hostname1", new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); @@ -732,17 +728,16 @@ public class UnicastZenPingTests extends ESTestCase { new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()); closeables.push(transportService); - final List discoveryNodes = TestUnicastZenPing.resolveHostsLists( + final List transportAddresses = TestUnicastZenPing.resolveHostsLists( executorService, logger, Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), 1, transportService, - "test_", TimeValue.timeValueSeconds(1)); - assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used - assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1")); - assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301)); + assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used + assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1")); + assertThat(transportAddresses.get(0).getPort(), equalTo(9301)); verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java index 46bbdcc7646..2e60a3c518d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java @@ -21,6 +21,7 @@ package org.elasticsearch.test.discovery; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.zen.UnicastHostsProvider; @@ -55,7 +56,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos } @Override - public List buildDynamicNodes() { + public List buildDynamicHosts() { final DiscoveryNode localNode = getNode(); assert localNode != null; synchronized (activeNodesPerCluster) { @@ -64,6 +65,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos .map(MockUncasedHostProvider::getNode) .filter(Objects::nonNull) .filter(n -> localNode.equals(n) == false) + .map(DiscoveryNode::getAddress) .collect(Collectors.toList()); } }