Return transport addresses from UnicastHostsProvider (#31426)
With #20695 we removed local transport and there is just TransportAddress now. The UnicastHostsProvider currently returns DiscoveryNode instances, where, during pinging, we're actually only making use of the TransportAddress to establish a first connection to the possible new node. To simplify the interface, we can just return a list of transport addresses instead, which means that it's not necessary anymore to create fake node objects in each plugin just to return the address information.
This commit is contained in:
parent
86423f9563
commit
da69ab28c7
|
@ -24,12 +24,10 @@ import com.microsoft.windowsazure.management.compute.models.DeploymentStatus;
|
|||
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
|
||||
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
|
||||
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException;
|
||||
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
|
@ -47,9 +45,6 @@ import java.net.InetSocketAddress;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
||||
public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
|
||||
|
||||
public enum HostType {
|
||||
|
@ -104,7 +99,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
|||
|
||||
private final TimeValue refreshInterval;
|
||||
private long lastRefresh;
|
||||
private List<DiscoveryNode> cachedDiscoNodes;
|
||||
private List<TransportAddress> dynamicHosts;
|
||||
private final HostType hostType;
|
||||
private final String publicEndpointName;
|
||||
private final String deploymentName;
|
||||
|
@ -137,30 +132,30 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
|||
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
|
||||
*/
|
||||
@Override
|
||||
public List<DiscoveryNode> buildDynamicNodes() {
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
if (refreshInterval.millis() != 0) {
|
||||
if (cachedDiscoNodes != null &&
|
||||
if (dynamicHosts != null &&
|
||||
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
|
||||
logger.trace("using cache to retrieve node list");
|
||||
return cachedDiscoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
lastRefresh = System.currentTimeMillis();
|
||||
}
|
||||
logger.debug("start building nodes list using Azure API");
|
||||
|
||||
cachedDiscoNodes = new ArrayList<>();
|
||||
dynamicHosts = new ArrayList<>();
|
||||
|
||||
HostedServiceGetDetailedResponse detailed;
|
||||
try {
|
||||
detailed = azureComputeService.getServiceDetails();
|
||||
} catch (AzureServiceDisableException e) {
|
||||
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
|
||||
return cachedDiscoNodes;
|
||||
return dynamicHosts;
|
||||
} catch (AzureServiceRemoteException e) {
|
||||
// We got a remote exception
|
||||
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
|
||||
logger.trace("AzureServiceRemoteException caught", e);
|
||||
return cachedDiscoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
InetAddress ipAddress = null;
|
||||
|
@ -212,8 +207,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
|||
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
|
||||
for (TransportAddress address : addresses) {
|
||||
logger.trace("adding {}, transport_address {}", networkAddress, address);
|
||||
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
|
||||
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
dynamicHosts.add(address);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
|
||||
|
@ -221,9 +215,9 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
|
|||
}
|
||||
}
|
||||
|
||||
logger.debug("{} node(s) added", cachedDiscoNodes.size());
|
||||
logger.debug("{} addresses added", dynamicHosts.size());
|
||||
|
||||
return cachedDiscoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {
|
||||
|
|
|
@ -29,8 +29,6 @@ import com.amazonaws.services.ec2.model.Reservation;
|
|||
import com.amazonaws.services.ec2.model.Tag;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
@ -46,8 +44,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.disjoint;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX;
|
||||
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS;
|
||||
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP;
|
||||
|
@ -70,7 +66,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
|
||||
private final String hostType;
|
||||
|
||||
private final DiscoNodesCache discoNodes;
|
||||
private final TransportAddressesCache dynamicHosts;
|
||||
|
||||
AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
|
||||
super(settings);
|
||||
|
@ -78,7 +74,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
this.awsEc2Service = awsEc2Service;
|
||||
|
||||
this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings);
|
||||
this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
|
||||
this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
|
||||
|
||||
this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings);
|
||||
this.groups = new HashSet<>();
|
||||
|
@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<DiscoveryNode> buildDynamicNodes() {
|
||||
return discoNodes.getOrRefresh();
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
return dynamicHosts.getOrRefresh();
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||
protected List<TransportAddress> fetchDynamicNodes() {
|
||||
|
||||
final List<DiscoveryNode> discoNodes = new ArrayList<>();
|
||||
final List<TransportAddress> dynamicHosts = new ArrayList<>();
|
||||
|
||||
final DescribeInstancesResult descInstances;
|
||||
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
|
||||
|
@ -115,7 +111,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
} catch (final AmazonClientException e) {
|
||||
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
|
||||
logger.debug("Full exception:", e);
|
||||
return discoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
logger.trace("building dynamic unicast discovery nodes...");
|
||||
|
@ -179,8 +175,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
||||
discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i,
|
||||
addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
dynamicHosts.add(addresses[i]);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
final String finalAddress = address;
|
||||
|
@ -194,9 +189,9 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
}
|
||||
}
|
||||
|
||||
logger.debug("using dynamic discovery nodes {}", discoNodes);
|
||||
logger.debug("using dynamic transport addresses {}", dynamicHosts);
|
||||
|
||||
return discoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
private DescribeInstancesRequest buildDescribeInstancesRequest() {
|
||||
|
@ -222,11 +217,11 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
return describeInstancesRequest;
|
||||
}
|
||||
|
||||
private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
|
||||
private final class TransportAddressesCache extends SingleObjectCache<List<TransportAddress>> {
|
||||
|
||||
private boolean empty = true;
|
||||
|
||||
protected DiscoNodesCache(TimeValue refreshInterval) {
|
||||
protected TransportAddressesCache(TimeValue refreshInterval) {
|
||||
super(refreshInterval, new ArrayList<>());
|
||||
}
|
||||
|
||||
|
@ -236,8 +231,8 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<DiscoveryNode> refresh() {
|
||||
final List<DiscoveryNode> nodes = fetchDynamicNodes();
|
||||
protected List<TransportAddress> refresh() {
|
||||
final List<TransportAddress> nodes = fetchDynamicNodes();
|
||||
empty = nodes.isEmpty();
|
||||
return nodes;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.discovery.ec2;
|
|||
|
||||
import com.amazonaws.services.ec2.model.Tag;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -87,16 +86,16 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
null);
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
|
||||
return buildDynamicNodes(nodeSettings, nodes, null);
|
||||
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
|
||||
return buildDynamicHosts(nodeSettings, nodes, null);
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
|
||||
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
|
||||
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
|
||||
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
|
||||
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
|
||||
logger.debug("--> nodes found: {}", discoveryNodes);
|
||||
return discoveryNodes;
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
|
||||
logger.debug("--> addresses found: {}", dynamicHosts);
|
||||
return dynamicHosts;
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected IOException");
|
||||
return null;
|
||||
|
@ -107,7 +106,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
int nodes = randomInt(10);
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
List<TransportAddress> discoveryNodes = buildDynamicHosts(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
}
|
||||
|
||||
|
@ -119,12 +118,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
Settings nodeSettings = Settings.builder()
|
||||
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
List<TransportAddress> transportAddresses = buildDynamicHosts(nodeSettings, nodes);
|
||||
assertThat(transportAddresses, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
for (TransportAddress address : transportAddresses) {
|
||||
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
|
||||
assertEquals(address, expected);
|
||||
}
|
||||
|
@ -138,12 +136,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
Settings nodeSettings = Settings.builder()
|
||||
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
|
||||
assertThat(dynamicHosts, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
for (TransportAddress address : dynamicHosts) {
|
||||
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
|
||||
assertEquals(address, expected);
|
||||
}
|
||||
|
@ -159,13 +156,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
Settings nodeSettings = Settings.builder()
|
||||
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
|
||||
assertThat(dynamicHosts, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
for (TransportAddress address : dynamicHosts) {
|
||||
String instanceId = "node" + node++;
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = poorMansDNS.get(
|
||||
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
|
||||
assertEquals(address, expected);
|
||||
|
@ -182,13 +178,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
Settings nodeSettings = Settings.builder()
|
||||
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
|
||||
.build();
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
|
||||
assertThat(dynamicHosts, hasSize(nodes));
|
||||
// We check that we are using here expected address
|
||||
int node = 1;
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
for (TransportAddress address : dynamicHosts) {
|
||||
String instanceId = "node" + node++;
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = poorMansDNS.get(
|
||||
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
|
||||
assertEquals(address, expected);
|
||||
|
@ -201,7 +196,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
|
||||
buildDynamicNodes(nodeSettings, 1);
|
||||
buildDynamicHosts(nodeSettings, 1);
|
||||
});
|
||||
assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type"));
|
||||
}
|
||||
|
@ -227,8 +222,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
|
||||
assertThat(discoveryNodes, hasSize(prodInstances));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
|
||||
assertThat(dynamicHosts, hasSize(prodInstances));
|
||||
}
|
||||
|
||||
public void testFilterByMultipleTags() throws InterruptedException {
|
||||
|
@ -258,8 +253,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
|
||||
assertThat(discoveryNodes, hasSize(prodInstances));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
|
||||
assertThat(dynamicHosts, hasSize(prodInstances));
|
||||
}
|
||||
|
||||
public void testReadHostFromTag() throws InterruptedException, UnknownHostException {
|
||||
|
@ -285,11 +280,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
logger.info("started [{}] instances", nodes);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
|
||||
assertThat(discoveryNodes, hasSize(nodes));
|
||||
for (DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
TransportAddress address = discoveryNode.getAddress();
|
||||
TransportAddress expected = poorMansDNS.get(discoveryNode.getName());
|
||||
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
|
||||
assertThat(dynamicHosts, hasSize(nodes));
|
||||
int node = 1;
|
||||
for (TransportAddress address : dynamicHosts) {
|
||||
TransportAddress expected = poorMansDNS.get("node" + node++);
|
||||
assertEquals(address, expected);
|
||||
}
|
||||
}
|
||||
|
@ -306,13 +301,13 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
|
||||
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) {
|
||||
@Override
|
||||
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||
protected List<TransportAddress> fetchDynamicNodes() {
|
||||
fetchCount++;
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicNodes();
|
||||
provider.buildDynamicHosts();
|
||||
}
|
||||
assertThat(provider.fetchCount, is(3));
|
||||
}
|
||||
|
@ -323,18 +318,18 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
|
||||
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) {
|
||||
@Override
|
||||
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||
protected List<TransportAddress> fetchDynamicNodes() {
|
||||
fetchCount++;
|
||||
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
|
||||
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
|
||||
}
|
||||
};
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicNodes();
|
||||
provider.buildDynamicHosts();
|
||||
}
|
||||
assertThat(provider.fetchCount, is(1));
|
||||
Thread.sleep(1_000L); // wait for cache to expire
|
||||
for (int i=0; i<3; i++) {
|
||||
provider.buildDynamicNodes();
|
||||
provider.buildDynamicHosts();
|
||||
}
|
||||
assertThat(provider.fetchCount, is(2));
|
||||
}
|
||||
|
|
|
@ -21,8 +21,8 @@ package org.elasticsearch.discovery.file;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
@ -58,7 +58,6 @@ import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
|
|||
class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
|
||||
|
||||
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
|
||||
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
|
||||
|
||||
private final TransportService transportService;
|
||||
private final ExecutorService executorService;
|
||||
|
@ -76,7 +75,7 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<DiscoveryNode> buildDynamicNodes() {
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
List<String> hostsList;
|
||||
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
|
||||
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
|
||||
|
@ -91,23 +90,22 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
|
|||
hostsList = Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<DiscoveryNode> discoNodes = new ArrayList<>();
|
||||
final List<TransportAddress> dynamicHosts = new ArrayList<>();
|
||||
try {
|
||||
discoNodes.addAll(resolveHostsLists(
|
||||
dynamicHosts.addAll(resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
hostsList,
|
||||
1,
|
||||
transportService,
|
||||
UNICAST_HOST_PREFIX,
|
||||
resolveTimeout));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes);
|
||||
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);
|
||||
|
||||
return discoNodes;
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.discovery.file;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -50,7 +49,6 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
|
||||
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX;
|
||||
|
||||
/**
|
||||
* Tests for {@link FileBasedUnicastHostsProvider}.
|
||||
|
@ -104,23 +102,20 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
|
||||
public void testBuildDynamicNodes() throws Exception {
|
||||
final List<String> hostEntries = Arrays.asList("#comment, should be ignored", "192.168.0.1", "192.168.0.2:9305", "255.255.23.15");
|
||||
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
|
||||
final List<TransportAddress> nodes = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment
|
||||
assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress());
|
||||
assertEquals(9300, nodes.get(0).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.1_0#", nodes.get(0).getId());
|
||||
assertEquals("192.168.0.2", nodes.get(1).getAddress().getAddress());
|
||||
assertEquals(9305, nodes.get(1).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.2:9305_0#", nodes.get(1).getId());
|
||||
assertEquals("255.255.23.15", nodes.get(2).getAddress().getAddress());
|
||||
assertEquals(9300, nodes.get(2).getAddress().getPort());
|
||||
assertEquals(UNICAST_HOST_PREFIX + "255.255.23.15_0#", nodes.get(2).getId());
|
||||
assertEquals("192.168.0.1", nodes.get(0).getAddress());
|
||||
assertEquals(9300, nodes.get(0).getPort());
|
||||
assertEquals("192.168.0.2", nodes.get(1).getAddress());
|
||||
assertEquals(9305, nodes.get(1).getPort());
|
||||
assertEquals("255.255.23.15", nodes.get(2).getAddress());
|
||||
assertEquals(9300, nodes.get(2).getPort());
|
||||
}
|
||||
|
||||
public void testEmptyUnicastHostsFile() throws Exception {
|
||||
final List<String> hostEntries = Collections.emptyList();
|
||||
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(0, nodes.size());
|
||||
final List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(0, addresses.size());
|
||||
}
|
||||
|
||||
public void testUnicastHostsDoesNotExist() throws Exception {
|
||||
|
@ -129,27 +124,27 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
.build();
|
||||
final Environment environment = TestEnvironment.newEnvironment(settings);
|
||||
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
|
||||
final List<DiscoveryNode> nodes = provider.buildDynamicNodes();
|
||||
assertEquals(0, nodes.size());
|
||||
final List<TransportAddress> addresses = provider.buildDynamicHosts();
|
||||
assertEquals(0, addresses.size());
|
||||
}
|
||||
|
||||
public void testInvalidHostEntries() throws Exception {
|
||||
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300");
|
||||
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(0, nodes.size());
|
||||
List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(0, addresses.size());
|
||||
}
|
||||
|
||||
public void testSomeInvalidHostEntries() throws Exception {
|
||||
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300", "192.168.0.1:9301");
|
||||
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(1, nodes.size()); // only one of the two is valid and will be used
|
||||
assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress());
|
||||
assertEquals(9301, nodes.get(0).getAddress().getPort());
|
||||
List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
|
||||
assertEquals(1, addresses.size()); // only one of the two is valid and will be used
|
||||
assertEquals("192.168.0.1", addresses.get(0).getAddress());
|
||||
assertEquals(9301, addresses.get(0).getPort());
|
||||
}
|
||||
|
||||
// sets up the config dir, writes to the unicast hosts file in the config dir,
|
||||
// and then runs the file-based unicast host provider to get the list of discovery nodes
|
||||
private List<DiscoveryNode> setupAndRunHostProvider(final List<String> hostEntries) throws IOException {
|
||||
private List<TransportAddress> setupAndRunHostProvider(final List<String> hostEntries) throws IOException {
|
||||
final Path homeDir = createTempDir();
|
||||
final Settings settings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), homeDir)
|
||||
|
@ -168,6 +163,6 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
return new FileBasedUnicastHostsProvider(
|
||||
new Environment(settings, configPath), transportService, executorService).buildDynamicNodes();
|
||||
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,9 +31,7 @@ import com.google.api.services.compute.model.Instance;
|
|||
import com.google.api.services.compute.model.NetworkInterface;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.gce.GceInstancesService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
|
@ -47,8 +45,6 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
||||
public class GceUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
|
||||
|
||||
|
@ -72,7 +68,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
|
||||
private final TimeValue refreshInterval;
|
||||
private long lastRefresh;
|
||||
private List<DiscoveryNode> cachedDiscoNodes;
|
||||
private List<TransportAddress> cachedDynamicHosts;
|
||||
|
||||
public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstancesService,
|
||||
TransportService transportService,
|
||||
|
@ -97,7 +93,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
|
||||
*/
|
||||
@Override
|
||||
public List<DiscoveryNode> buildDynamicNodes() {
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
// We check that needed properties have been set
|
||||
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
|
||||
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +
|
||||
|
@ -106,16 +102,16 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
}
|
||||
|
||||
if (refreshInterval.millis() != 0) {
|
||||
if (cachedDiscoNodes != null &&
|
||||
if (cachedDynamicHosts != null &&
|
||||
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
|
||||
if (logger.isTraceEnabled()) logger.trace("using cache to retrieve node list");
|
||||
return cachedDiscoNodes;
|
||||
return cachedDynamicHosts;
|
||||
}
|
||||
lastRefresh = System.currentTimeMillis();
|
||||
}
|
||||
logger.debug("start building nodes list using GCE API");
|
||||
|
||||
cachedDiscoNodes = new ArrayList<>();
|
||||
cachedDynamicHosts = new ArrayList<>();
|
||||
String ipAddress = null;
|
||||
try {
|
||||
InetAddress inetAddress = networkService.resolvePublishHostAddresses(
|
||||
|
@ -133,7 +129,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
|
||||
if (instances == null) {
|
||||
logger.trace("no instance found for project [{}], zones [{}].", this.project, this.zones);
|
||||
return cachedDiscoNodes;
|
||||
return cachedDynamicHosts;
|
||||
}
|
||||
|
||||
for (Instance instance : instances) {
|
||||
|
@ -238,8 +234,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
for (TransportAddress transportAddress : addresses) {
|
||||
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
|
||||
ip_private, transportAddress, status);
|
||||
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress,
|
||||
emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
|
||||
cachedDynamicHosts.add(transportAddress);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -252,9 +247,9 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
|
|||
logger.warn("exception caught during discovery", e);
|
||||
}
|
||||
|
||||
logger.debug("{} node(s) added", cachedDiscoNodes.size());
|
||||
logger.debug("using dynamic discovery nodes {}", cachedDiscoNodes);
|
||||
logger.debug("{} addresses added", cachedDynamicHosts.size());
|
||||
logger.debug("using transport addresses {}", cachedDynamicHosts);
|
||||
|
||||
return cachedDiscoNodes;
|
||||
return cachedDynamicHosts;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ package org.elasticsearch.discovery.gce;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -40,7 +40,6 @@ import java.util.Locale;
|
|||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* This test class uses a GCE HTTP Mock system which allows to simulate JSON Responses.
|
||||
|
@ -105,13 +104,13 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
protected List<DiscoveryNode> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
|
||||
protected List<TransportAddress> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
|
||||
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
|
||||
transportService, new NetworkService(Collections.emptyList()));
|
||||
|
||||
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
|
||||
logger.info("--> nodes found: {}", discoveryNodes);
|
||||
return discoveryNodes;
|
||||
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
|
||||
logger.info("--> addresses found: {}", dynamicHosts);
|
||||
return dynamicHosts;
|
||||
}
|
||||
|
||||
public void testNodesWithDifferentTagsAndNoTagSet() {
|
||||
|
@ -120,8 +119,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
public void testNodesWithDifferentTagsAndOneTagSet() {
|
||||
|
@ -131,9 +130,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(1));
|
||||
assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0"));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(1));
|
||||
}
|
||||
|
||||
public void testNodesWithDifferentTagsAndTwoTagSet() {
|
||||
|
@ -143,9 +141,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(1));
|
||||
assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0"));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(1));
|
||||
}
|
||||
|
||||
public void testNodesWithSameTagsAndNoTagSet() {
|
||||
|
@ -154,8 +151,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
public void testNodesWithSameTagsAndOneTagSet() {
|
||||
|
@ -165,8 +162,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
public void testNodesWithSameTagsAndTwoTagsSet() {
|
||||
|
@ -176,8 +173,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
public void testMultipleZonesAndTwoNodesInSameZone() {
|
||||
|
@ -186,8 +183,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
public void testMultipleZonesAndTwoNodesInDifferentZones() {
|
||||
|
@ -196,8 +193,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(2));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(2));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -209,8 +206,8 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "us-central1-b")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(0));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(0));
|
||||
}
|
||||
|
||||
public void testIllegalSettingsMissingAllRequired() {
|
||||
|
@ -261,7 +258,7 @@ public class GceDiscoveryTests extends ESTestCase {
|
|||
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b", "us-central1-a")
|
||||
.build();
|
||||
mock = new GceInstancesServiceMock(nodeSettings);
|
||||
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(discoveryNodes, hasSize(1));
|
||||
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
|
||||
assertThat(dynamicHosts, hasSize(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
@ -31,5 +31,5 @@ public interface UnicastHostsProvider {
|
|||
/**
|
||||
* Builds the dynamic list of unicast hosts to be used for unicast discovery.
|
||||
*/
|
||||
List<DiscoveryNode> buildDynamicNodes();
|
||||
List<TransportAddress> buildDynamicHosts();
|
||||
}
|
||||
|
|
|
@ -118,9 +118,6 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
|
||||
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
|
||||
|
||||
// used as a node id prefix for configured unicast host nodes/address
|
||||
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
|
||||
|
||||
private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();
|
||||
|
||||
// a list of temporal responses a node will return for a request (holds responses from other nodes)
|
||||
|
@ -184,23 +181,20 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
* @param hosts the hosts to resolve
|
||||
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
|
||||
* @param transportService the transport service
|
||||
* @param nodeId_prefix a prefix to use for node ids
|
||||
* @param resolveTimeout the timeout before returning from hostname lookups
|
||||
* @return a list of discovery nodes with resolved transport addresses
|
||||
* @return a list of resolved transport addresses
|
||||
*/
|
||||
public static List<DiscoveryNode> resolveHostsLists(
|
||||
public static List<TransportAddress> resolveHostsLists(
|
||||
final ExecutorService executorService,
|
||||
final Logger logger,
|
||||
final List<String> hosts,
|
||||
final int limitPortCounts,
|
||||
final TransportService transportService,
|
||||
final String nodeId_prefix,
|
||||
final TimeValue resolveTimeout) throws InterruptedException {
|
||||
Objects.requireNonNull(executorService);
|
||||
Objects.requireNonNull(logger);
|
||||
Objects.requireNonNull(hosts);
|
||||
Objects.requireNonNull(transportService);
|
||||
Objects.requireNonNull(nodeId_prefix);
|
||||
Objects.requireNonNull(resolveTimeout);
|
||||
if (resolveTimeout.nanos() < 0) {
|
||||
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
|
||||
|
@ -213,7 +207,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
.collect(Collectors.toList());
|
||||
final List<Future<TransportAddress[]>> futures =
|
||||
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
|
||||
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
|
||||
final List<TransportAddress> transportAddresses = new ArrayList<>();
|
||||
final Set<TransportAddress> localAddresses = new HashSet<>();
|
||||
localAddresses.add(transportService.boundAddress().publishAddress());
|
||||
localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
|
||||
|
@ -231,13 +225,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
final TransportAddress address = addresses[addressId];
|
||||
// no point in pinging ourselves
|
||||
if (localAddresses.contains(address) == false) {
|
||||
discoveryNodes.add(
|
||||
new DiscoveryNode(
|
||||
nodeId_prefix + hostname + "_" + addressId + "#",
|
||||
address,
|
||||
emptyMap(),
|
||||
emptySet(),
|
||||
Version.CURRENT.minimumCompatibilityVersion()));
|
||||
transportAddresses.add(address);
|
||||
}
|
||||
}
|
||||
} catch (final ExecutionException e) {
|
||||
|
@ -249,7 +237,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
|
||||
}
|
||||
}
|
||||
return discoveryNodes;
|
||||
return Collections.unmodifiableList(transportAddresses);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -292,29 +280,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
protected void ping(final Consumer<PingCollection> resultsConsumer,
|
||||
final TimeValue scheduleDuration,
|
||||
final TimeValue requestDuration) {
|
||||
final List<DiscoveryNode> seedNodes;
|
||||
final List<TransportAddress> seedAddresses = new ArrayList<>();
|
||||
try {
|
||||
seedNodes = resolveHostsLists(
|
||||
seedAddresses.addAll(resolveHostsLists(
|
||||
unicastZenPingExecutorService,
|
||||
logger,
|
||||
configuredHosts,
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
UNICAST_NODE_PREFIX,
|
||||
resolveTimeout);
|
||||
resolveTimeout));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
seedNodes.addAll(hostsProvider.buildDynamicNodes());
|
||||
seedAddresses.addAll(hostsProvider.buildDynamicHosts());
|
||||
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
|
||||
// add all possible master nodes that were active in the last known cluster configuration
|
||||
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
|
||||
seedNodes.add(masterNode.value);
|
||||
seedAddresses.add(masterNode.value.getAddress());
|
||||
}
|
||||
|
||||
final ConnectionProfile connectionProfile =
|
||||
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
|
||||
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
|
||||
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
|
||||
nodes.getLocalNode(), connectionProfile);
|
||||
activePingingRounds.put(pingingRound.id(), pingingRound);
|
||||
final AbstractRunnable pingSender = new AbstractRunnable() {
|
||||
|
@ -356,17 +343,17 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
|
||||
private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
|
||||
private final PingCollection pingCollection;
|
||||
private final List<DiscoveryNode> seedNodes;
|
||||
private final List<TransportAddress> seedAddresses;
|
||||
private final Consumer<PingCollection> pingListener;
|
||||
private final DiscoveryNode localNode;
|
||||
private final ConnectionProfile connectionProfile;
|
||||
|
||||
private AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
PingingRound(int id, List<DiscoveryNode> seedNodes, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
|
||||
PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
|
||||
ConnectionProfile connectionProfile) {
|
||||
this.id = id;
|
||||
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
|
||||
this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));
|
||||
this.pingListener = resultsConsumer;
|
||||
this.localNode = localNode;
|
||||
this.connectionProfile = connectionProfile;
|
||||
|
@ -381,9 +368,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
return this.closed.get();
|
||||
}
|
||||
|
||||
public List<DiscoveryNode> getSeedNodes() {
|
||||
public List<TransportAddress> getSeedAddresses() {
|
||||
ensureOpen();
|
||||
return seedNodes;
|
||||
return seedAddresses;
|
||||
}
|
||||
|
||||
public Connection getOrConnect(DiscoveryNode node) throws IOException {
|
||||
|
@ -457,26 +444,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
|||
final ClusterState lastState = contextProvider.clusterState();
|
||||
final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));
|
||||
|
||||
Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
|
||||
List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {
|
||||
assert clusterName.equals(pingResponse.clusterName()) :
|
||||
"got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName();
|
||||
return pingResponse.node();
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
// dedup by address
|
||||
final Map<TransportAddress, DiscoveryNode> uniqueNodesByAddress =
|
||||
Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream())
|
||||
.collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1));
|
||||
return pingResponse.node().getAddress();
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),
|
||||
temporalAddresses.stream()).distinct();
|
||||
|
||||
// resolve what we can via the latest cluster state
|
||||
final Set<DiscoveryNode> nodesToPing = uniqueNodesByAddress.values().stream()
|
||||
.map(node -> {
|
||||
DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress());
|
||||
if (foundNode == null) {
|
||||
return node;
|
||||
} else {
|
||||
final Set<DiscoveryNode> nodesToPing = uniqueAddresses
|
||||
.map(address -> {
|
||||
DiscoveryNode foundNode = lastState.nodes().findByAddress(address);
|
||||
if (foundNode != null && transportService.nodeConnected(foundNode)) {
|
||||
return foundNode;
|
||||
} else {
|
||||
return new DiscoveryNode(
|
||||
address.toString(),
|
||||
address,
|
||||
emptyMap(),
|
||||
emptySet(),
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
}
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
|
|||
internalCluster().getInstance(TransportService.class);
|
||||
// try to ping the single node directly
|
||||
final UnicastHostsProvider provider =
|
||||
() -> Collections.singletonList(nodeTransport.getLocalNode());
|
||||
() -> Collections.singletonList(nodeTransport.getLocalNode().getAddress());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final DiscoveryNodes nodes = DiscoveryNodes.builder()
|
||||
.add(nodeTransport.getLocalNode())
|
||||
|
|
|
@ -408,19 +408,18 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
Collections.emptySet());
|
||||
closeables.push(transportService);
|
||||
final int limitPortCounts = randomIntBetween(1, 10);
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Collections.singletonList("127.0.0.1"),
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1));
|
||||
assertThat(discoveryNodes, hasSize(limitPortCounts));
|
||||
assertThat(transportAddresses, hasSize(limitPortCounts));
|
||||
final Set<Integer> ports = new HashSet<>();
|
||||
for (final DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
|
||||
ports.add(discoveryNode.getAddress().getPort());
|
||||
for (final TransportAddress address : transportAddresses) {
|
||||
assertTrue(address.address().getAddress().isLoopbackAddress());
|
||||
ports.add(address.getPort());
|
||||
}
|
||||
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet())));
|
||||
}
|
||||
|
@ -453,19 +452,18 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
|
||||
Collections.emptySet());
|
||||
closeables.push(transportService);
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
|
||||
10,
|
||||
transportService,
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1));
|
||||
assertThat(discoveryNodes, hasSize(7));
|
||||
assertThat(transportAddresses, hasSize(7));
|
||||
final Set<Integer> ports = new HashSet<>();
|
||||
for (final DiscoveryNode discoveryNode : discoveryNodes) {
|
||||
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
|
||||
ports.add(discoveryNode.getAddress().getPort());
|
||||
for (final TransportAddress address : transportAddresses) {
|
||||
assertTrue(address.address().getAddress().isLoopbackAddress());
|
||||
ports.add(address.getPort());
|
||||
}
|
||||
assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet())));
|
||||
}
|
||||
|
@ -505,17 +503,16 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
Collections.emptySet());
|
||||
closeables.push(transportService);
|
||||
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList(hostname),
|
||||
1,
|
||||
transportService,
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1)
|
||||
);
|
||||
|
||||
assertThat(discoveryNodes, empty());
|
||||
assertThat(transportAddresses, empty());
|
||||
verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException);
|
||||
}
|
||||
|
||||
|
@ -565,16 +562,15 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
closeables.push(transportService);
|
||||
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
|
||||
try {
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList("hostname1", "hostname2"),
|
||||
1,
|
||||
transportService,
|
||||
"test+",
|
||||
resolveTimeout);
|
||||
|
||||
assertThat(discoveryNodes, hasSize(1));
|
||||
assertThat(transportAddresses, hasSize(1));
|
||||
verify(logger).trace(
|
||||
"resolved host [{}] to {}", "hostname1",
|
||||
new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)});
|
||||
|
@ -732,17 +728,16 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
|
||||
Collections.emptySet());
|
||||
closeables.push(transportService);
|
||||
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
|
||||
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
|
||||
1,
|
||||
transportService,
|
||||
"test_",
|
||||
TimeValue.timeValueSeconds(1));
|
||||
assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used
|
||||
assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1"));
|
||||
assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301));
|
||||
assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used
|
||||
assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1"));
|
||||
assertThat(transportAddresses.get(0).getPort(), equalTo(9301));
|
||||
verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class));
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.test.discovery;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
|
||||
|
@ -55,7 +56,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<DiscoveryNode> buildDynamicNodes() {
|
||||
public List<TransportAddress> buildDynamicHosts() {
|
||||
final DiscoveryNode localNode = getNode();
|
||||
assert localNode != null;
|
||||
synchronized (activeNodesPerCluster) {
|
||||
|
@ -64,6 +65,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos
|
|||
.map(MockUncasedHostProvider::getNode)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(n -> localNode.equals(n) == false)
|
||||
.map(DiscoveryNode::getAddress)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue