diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 176ac5763e3..231fa3ee070 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -160,18 +160,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin } logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); - List configuredTargetNodes = new ArrayList<>(); - for (String host : hosts) { - try { - TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); - for (TransportAddress address : addresses) { - configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", - address, emptyMap(), emptySet(), getVersion().minimumCompatibilityVersion())); - } - } catch (Exception e) { - throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e); - } + for (final String host : hosts) { + configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#")); } this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]); @@ -183,6 +175,32 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin threadFactory, threadPool.getThreadContext()); } + /** + * Resolves a host to a list of discovery nodes. The host is resolved into a transport + * address (or a collection of addresses if the number of ports is greater than one) and + * the transport addresses are used to created discovery nodes. + * + * @param host the host to resolve + * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) + * @param transportService the transport service + * @param idGenerator the generator to supply unique ids for each discovery node + * @return a list of discovery nodes with resolved transport addresses + */ + public static List resolveDiscoveryNodes(final String host, final int limitPortCounts, + final TransportService transportService, final Supplier idGenerator) { + List discoveryNodes = new ArrayList<>(); + try { + TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); + for (TransportAddress address : addresses) { + discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(), + Version.CURRENT.minimumCompatibilityVersion())); + } + } catch (Exception e) { + throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e); + } + return discoveryNodes; + } + @Override protected void doStart() { } diff --git a/dev-tools/smoke_test_rc.py b/dev-tools/smoke_test_rc.py index 33abbf96345..883a62210c9 100644 --- a/dev-tools/smoke_test_rc.py +++ b/dev-tools/smoke_test_rc.py @@ -65,6 +65,7 @@ DEFAULT_PLUGINS = ["analysis-icu", "analysis-stempel", "discovery-azure-classic", "discovery-ec2", + "discovery-file", "discovery-gce", "ingest-attachment", "ingest-geoip", diff --git a/plugins/discovery-file/build.gradle b/plugins/discovery-file/build.gradle new file mode 100644 index 00000000000..36fd87e37a9 --- /dev/null +++ b/plugins/discovery-file/build.gradle @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +esplugin { + description 'Discovery file plugin enables unicast discovery from hosts stored in a file.' + classname 'org.elasticsearch.discovery.file.FileBasedDiscoveryPlugin' +} + +bundlePlugin { + from('config/discovery-file') { + into 'config' + } +} diff --git a/plugins/discovery-file/config/discovery-file/unicast_hosts.txt b/plugins/discovery-file/config/discovery-file/unicast_hosts.txt new file mode 100644 index 00000000000..5e265e0f295 --- /dev/null +++ b/plugins/discovery-file/config/discovery-file/unicast_hosts.txt @@ -0,0 +1,20 @@ +# The unicast_hosts.txt file contains the list of unicast hosts to connect to +# for pinging during the discovery process, when using the file-based discovery +# mechanism. This file should contain one entry per line, where an entry is a +# host/port combination. The host and port should be separated by a `:`. If +# the port is left off, a default port of 9300 is assumed. For example, if the +# cluster has three nodes that participate in the discovery process: +# (1) 66.77.88.99 running on port 9300 (2) 66.77.88.100 running on port 9305 +# and (3) 66.77.88.99 running on port 10005, then this file should contain the +# following text: +# +#10.10.10.5 +#10.10.10.6:9305 +#10.10.10.5:10005 +# +# For IPv6 addresses, make sure to put a bracket around the host part of the address, +# for example: [2001:cdba:0000:0000:0000:0000:3257:9652]:9301 (where 9301 is the port). +# +# NOTE: all lines starting with a `#` are comments, and comments must exist +# on lines of their own (i.e. comments cannot begin in the middle of a line) +# \ No newline at end of file diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java new file mode 100644 index 00000000000..f781a3b7fe9 --- /dev/null +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.file; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.plugins.Plugin; + +/** + * Plugin for providing file-based unicast hosts discovery. The list of unicast hosts + * is obtained by reading the {@link FileBasedUnicastHostsProvider#UNICAST_HOSTS_FILE} in + * the {@link Environment#configFile()}/discovery-file directory. + */ +public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin { + + private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class); + + private final Settings settings; + + public FileBasedDiscoveryPlugin(Settings settings) { + this.settings = settings; + logger.trace("starting file-based discovery plugin..."); + } + + public void onModule(DiscoveryModule discoveryModule) { + logger.trace("registering file-based unicast hosts provider"); + // using zen discovery for the discovery type and we're just adding a unicast host provider for it + discoveryModule.addUnicastHostProvider("zen", FileBasedUnicastHostsProvider.class); + } +} diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java new file mode 100644 index 00000000000..78393d34001 --- /dev/null +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.file; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; +import org.elasticsearch.env.Environment; +import org.elasticsearch.transport.TransportService; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing.resolveDiscoveryNodes; + +/** + * An implementation of {@link UnicastHostsProvider} that reads hosts/ports + * from {@link #UNICAST_HOSTS_FILE}. + * + * Each unicast host/port that is part of the discovery process must be listed on + * a separate line. If the port is left off an entry, a default port of 9300 is + * assumed. An example unicast hosts file could read: + * + * 67.81.244.10 + * 67.81.244.11:9305 + * 67.81.244.15:9400 + */ +public class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider { + + static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt"; + static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_"; + + private final TransportService transportService; + + private final Path unicastHostsFilePath; + + private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node + + @Inject + public FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) { + super(settings); + this.transportService = transportService; + this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE); + } + + @Override + public List buildDynamicNodes() { + List hostsList; + try (Stream lines = Files.lines(unicastHostsFilePath)) { + hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments + .collect(Collectors.toList()); + } catch (FileNotFoundException | NoSuchFileException e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[discovery-file] Failed to find unicast hosts file [{}]", + unicastHostsFilePath), e); + hostsList = Collections.emptyList(); + } catch (IOException e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[discovery-file] Error reading unicast hosts file [{}]", + unicastHostsFilePath), e); + hostsList = Collections.emptyList(); + } + + final List discoNodes = new ArrayList<>(); + for (final String host : hostsList) { + try { + discoNodes.addAll(resolveDiscoveryNodes(host, 1, transportService, + () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#")); + } catch (IllegalArgumentException e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[discovery-file] Failed to parse transport address from [{}]", + host), e); + continue; + } + } + + logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes); + + return discoNodes; + } + +} diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedDiscoveryClientYamlTestSuiteIT.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedDiscoveryClientYamlTestSuiteIT.java new file mode 100644 index 00000000000..45905a152ce --- /dev/null +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedDiscoveryClientYamlTestSuiteIT.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.file; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.test.rest.yaml.parser.ClientYamlTestParseException; + +import java.io.IOException; + +/** + * Integration tests to make sure the file-based discovery plugin works in a cluster. + */ +public class FileBasedDiscoveryClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + + public FileBasedDiscoveryClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, ClientYamlTestParseException { + return ESClientYamlSuiteTestCase.createParameters(0, 1); + } +} diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java new file mode 100644 index 00000000000..abd91c8c07f --- /dev/null +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -0,0 +1,139 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.file; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTcpTransport; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; +import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX; + +/** + * Tests for {@link FileBasedUnicastHostsProvider}. + */ +public class FileBasedUnicastHostsProviderTests extends ESTestCase { + + private static ThreadPool threadPool; + private MockTransportService transportService; + + @BeforeClass + public static void createThreadPool() { + threadPool = new TestThreadPool(FileBasedUnicastHostsProviderTests.class.getName()); + } + + @AfterClass + public static void stopThreadPool() throws InterruptedException { + terminate(threadPool); + } + + @Before + public void createTransportSvc() { + MockTcpTransport transport = + new MockTcpTransport(Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + new NetworkService(Settings.EMPTY, Collections.emptyList())); + transportService = new MockTransportService(Settings.EMPTY, transport, threadPool); + } + + public void testBuildDynamicNodes() throws Exception { + final List hostEntries = Arrays.asList("#comment, should be ignored", "192.168.0.1", "192.168.0.2:9305", "255.255.23.15"); + final List nodes = setupAndRunHostProvider(hostEntries); + assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment + assertEquals("192.168.0.1", nodes.get(0).getAddress().getHost()); + assertEquals(9300, nodes.get(0).getAddress().getPort()); + assertEquals(UNICAST_HOST_PREFIX + "1#", nodes.get(0).getId()); + assertEquals("192.168.0.2", nodes.get(1).getAddress().getHost()); + assertEquals(9305, nodes.get(1).getAddress().getPort()); + assertEquals(UNICAST_HOST_PREFIX + "2#", nodes.get(1).getId()); + assertEquals("255.255.23.15", nodes.get(2).getAddress().getHost()); + assertEquals(9300, nodes.get(2).getAddress().getPort()); + assertEquals(UNICAST_HOST_PREFIX + "3#", nodes.get(2).getId()); + } + + public void testEmptyUnicastHostsFile() throws Exception { + final List hostEntries = Collections.emptyList(); + final List nodes = setupAndRunHostProvider(hostEntries); + assertEquals(0, nodes.size()); + } + + public void testUnicastHostsDoesNotExist() throws Exception { + final Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .build(); + final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService); + final List nodes = provider.buildDynamicNodes(); + assertEquals(0, nodes.size()); + } + + public void testInvalidHostEntries() throws Exception { + List hostEntries = Arrays.asList("192.168.0.1:9300:9300"); + List nodes = setupAndRunHostProvider(hostEntries); + assertEquals(0, nodes.size()); + } + + public void testSomeInvalidHostEntries() throws Exception { + List hostEntries = Arrays.asList("192.168.0.1:9300:9300", "192.168.0.1:9301"); + List nodes = setupAndRunHostProvider(hostEntries); + assertEquals(1, nodes.size()); // only one of the two is valid and will be used + assertEquals("192.168.0.1", nodes.get(0).getAddress().getHost()); + assertEquals(9301, nodes.get(0).getAddress().getPort()); + } + + // sets up the config dir, writes to the unicast hosts file in the config dir, + // and then runs the file-based unicast host provider to get the list of discovery nodes + private List setupAndRunHostProvider(final List hostEntries) throws IOException { + final Path homeDir = createTempDir(); + final Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), homeDir) + .build(); + final Path configDir = homeDir.resolve("config").resolve("discovery-file"); + Files.createDirectories(configDir); + final Path unicastHostsPath = configDir.resolve(UNICAST_HOSTS_FILE); + try (BufferedWriter writer = Files.newBufferedWriter(unicastHostsPath)) { + writer.write(String.join("\n", hostEntries)); + } + + return new FileBasedUnicastHostsProvider(settings, transportService).buildDynamicNodes(); + } +} diff --git a/plugins/discovery-file/src/test/resources/rest-api-spec/test/discovery_file/10_basic.yaml b/plugins/discovery-file/src/test/resources/rest-api-spec/test/discovery_file/10_basic.yaml new file mode 100644 index 00000000000..74ba6b54e3c --- /dev/null +++ b/plugins/discovery-file/src/test/resources/rest-api-spec/test/discovery_file/10_basic.yaml @@ -0,0 +1,13 @@ +# Integration tests for file-based discovery +# +"Discovery File loaded": + - do: + cluster.state: {} + + # Get master node id + - set: { master_node: master } + + - do: + nodes.info: {} + + - match: { nodes.$master.plugins.0.name: discovery-file } diff --git a/qa/vagrant/src/test/resources/packaging/scripts/module_and_plugin_test_cases.bash b/qa/vagrant/src/test/resources/packaging/scripts/module_and_plugin_test_cases.bash index c55d28e971c..54e0ea1570d 100644 --- a/qa/vagrant/src/test/resources/packaging/scripts/module_and_plugin_test_cases.bash +++ b/qa/vagrant/src/test/resources/packaging/scripts/module_and_plugin_test_cases.bash @@ -217,6 +217,10 @@ fi install_and_check_plugin discovery ec2 aws-java-sdk-core-*.jar } +@test "[$GROUP] install discovery-file plugin" { + install_and_check_plugin discovery file +} + @test "[$GROUP] install ingest-attachment plugin" { # we specify the version on the poi-3.15-beta1.jar so that the test does # not spuriously pass if the jar is missing but the other poi jars @@ -353,6 +357,10 @@ fi remove_plugin discovery-ec2 } +@test "[$GROUP] remove discovery-file plugin" { + remove_plugin discovery-file +} + @test "[$GROUP] remove ingest-attachment plugin" { remove_plugin ingest-attachment } diff --git a/settings.gradle b/settings.gradle index 904fb69469d..07720b386e5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,6 +38,7 @@ List projects = [ 'plugins:analysis-stempel', 'plugins:discovery-azure-classic', 'plugins:discovery-ec2', + 'plugins:discovery-file', 'plugins:discovery-gce', 'plugins:ingest-geoip', 'plugins:ingest-attachment',