File-based discovery plugin (#20394)

This commit introduces a new plugin for file-based unicast hosts
discovery. This allows specifying the unicast hosts participating
in discovery through a `unicast_hosts.txt` file located in the
`config/discovery-file` directory. The plugin will use the hosts 
specified in this file as the set of hosts to ping during discovery.

The format of the `unicast_hosts.txt` file is to have one host/port
entry per line. The hosts file is read and parsed every time
discovery makes ping requests, thus a new version of the file that
is published to the config directory will automatically be picked
up.

Closes #20323
This commit is contained in:
Ali Beyad 2016-09-13 20:52:39 -04:00 committed by GitHub
parent ee4798d852
commit 4431720c3d
11 changed files with 443 additions and 11 deletions

View File

@ -160,18 +160,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
}
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> configuredTargetNodes = new ArrayList<>();
for (String host : hosts) {
try {
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
address, emptyMap(), emptySet(), getVersion().minimumCompatibilityVersion()));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
for (final String host : hosts) {
configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"));
}
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
@ -183,6 +175,32 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
threadFactory, threadPool.getThreadContext());
}
/**
* Resolves a host to a list of discovery nodes. The host is resolved into a transport
* address (or a collection of addresses if the number of ports is greater than one) and
* the transport addresses are used to created discovery nodes.
*
* @param host the host to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param transportService the transport service
* @param idGenerator the generator to supply unique ids for each discovery node
* @return a list of discovery nodes with resolved transport addresses
*/
public static List<DiscoveryNode> resolveDiscoveryNodes(final String host, final int limitPortCounts,
final TransportService transportService, final Supplier<String> idGenerator) {
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
try {
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
return discoveryNodes;
}
@Override
protected void doStart() {
}

View File

@ -65,6 +65,7 @@ DEFAULT_PLUGINS = ["analysis-icu",
"analysis-stempel",
"discovery-azure-classic",
"discovery-ec2",
"discovery-file",
"discovery-gce",
"ingest-attachment",
"ingest-geoip",

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
esplugin {
description 'Discovery file plugin enables unicast discovery from hosts stored in a file.'
classname 'org.elasticsearch.discovery.file.FileBasedDiscoveryPlugin'
}
bundlePlugin {
from('config/discovery-file') {
into 'config'
}
}

View File

@ -0,0 +1,20 @@
# The unicast_hosts.txt file contains the list of unicast hosts to connect to
# for pinging during the discovery process, when using the file-based discovery
# mechanism. This file should contain one entry per line, where an entry is a
# host/port combination. The host and port should be separated by a `:`. If
# the port is left off, a default port of 9300 is assumed. For example, if the
# cluster has three nodes that participate in the discovery process:
# (1) 66.77.88.99 running on port 9300 (2) 66.77.88.100 running on port 9305
# and (3) 66.77.88.99 running on port 10005, then this file should contain the
# following text:
#
#10.10.10.5
#10.10.10.6:9305
#10.10.10.5:10005
#
# For IPv6 addresses, make sure to put a bracket around the host part of the address,
# for example: [2001:cdba:0000:0000:0000:0000:3257:9652]:9301 (where 9301 is the port).
#
# NOTE: all lines starting with a `#` are comments, and comments must exist
# on lines of their own (i.e. comments cannot begin in the middle of a line)
#

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.file;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
/**
* Plugin for providing file-based unicast hosts discovery. The list of unicast hosts
* is obtained by reading the {@link FileBasedUnicastHostsProvider#UNICAST_HOSTS_FILE} in
* the {@link Environment#configFile()}/discovery-file directory.
*/
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);
private final Settings settings;
public FileBasedDiscoveryPlugin(Settings settings) {
this.settings = settings;
logger.trace("starting file-based discovery plugin...");
}
public void onModule(DiscoveryModule discoveryModule) {
logger.trace("registering file-based unicast hosts provider");
// using zen discovery for the discovery type and we're just adding a unicast host provider for it
discoveryModule.addUnicastHostProvider("zen", FileBasedUnicastHostsProvider.class);
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.file;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing.resolveDiscoveryNodes;
/**
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
* from {@link #UNICAST_HOSTS_FILE}.
*
* Each unicast host/port that is part of the discovery process must be listed on
* a separate line. If the port is left off an entry, a default port of 9300 is
* assumed. An example unicast hosts file could read:
*
* 67.81.244.10
* 67.81.244.11:9305
* 67.81.244.15:9400
*/
public class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
private final TransportService transportService;
private final Path unicastHostsFilePath;
private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node
@Inject
public FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
List<String> hostsList;
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
.collect(Collectors.toList());
} catch (FileNotFoundException | NoSuchFileException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[discovery-file] Failed to find unicast hosts file [{}]",
unicastHostsFilePath), e);
hostsList = Collections.emptyList();
} catch (IOException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[discovery-file] Error reading unicast hosts file [{}]",
unicastHostsFilePath), e);
hostsList = Collections.emptyList();
}
final List<DiscoveryNode> discoNodes = new ArrayList<>();
for (final String host : hostsList) {
try {
discoNodes.addAll(resolveDiscoveryNodes(host, 1, transportService,
() -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#"));
} catch (IllegalArgumentException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[discovery-file] Failed to parse transport address from [{}]",
host), e);
continue;
}
}
logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes);
return discoNodes;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.file;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.elasticsearch.test.rest.yaml.parser.ClientYamlTestParseException;
import java.io.IOException;
/**
* Integration tests to make sure the file-based discovery plugin works in a cluster.
*/
public class FileBasedDiscoveryClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public FileBasedDiscoveryClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, ClientYamlTestParseException {
return ESClientYamlSuiteTestCase.createParameters(0, 1);
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.file;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX;
/**
* Tests for {@link FileBasedUnicastHostsProvider}.
*/
public class FileBasedUnicastHostsProviderTests extends ESTestCase {
private static ThreadPool threadPool;
private MockTransportService transportService;
@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(FileBasedUnicastHostsProviderTests.class.getName());
}
@AfterClass
public static void stopThreadPool() throws InterruptedException {
terminate(threadPool);
}
@Before
public void createTransportSvc() {
MockTcpTransport transport =
new MockTcpTransport(Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Settings.EMPTY, Collections.emptyList()));
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool);
}
public void testBuildDynamicNodes() throws Exception {
final List<String> hostEntries = Arrays.asList("#comment, should be ignored", "192.168.0.1", "192.168.0.2:9305", "255.255.23.15");
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment
assertEquals("192.168.0.1", nodes.get(0).getAddress().getHost());
assertEquals(9300, nodes.get(0).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "1#", nodes.get(0).getId());
assertEquals("192.168.0.2", nodes.get(1).getAddress().getHost());
assertEquals(9305, nodes.get(1).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "2#", nodes.get(1).getId());
assertEquals("255.255.23.15", nodes.get(2).getAddress().getHost());
assertEquals(9300, nodes.get(2).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "3#", nodes.get(2).getId());
}
public void testEmptyUnicastHostsFile() throws Exception {
final List<String> hostEntries = Collections.emptyList();
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(0, nodes.size());
}
public void testUnicastHostsDoesNotExist() throws Exception {
final Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService);
final List<DiscoveryNode> nodes = provider.buildDynamicNodes();
assertEquals(0, nodes.size());
}
public void testInvalidHostEntries() throws Exception {
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300");
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(0, nodes.size());
}
public void testSomeInvalidHostEntries() throws Exception {
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300", "192.168.0.1:9301");
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(1, nodes.size()); // only one of the two is valid and will be used
assertEquals("192.168.0.1", nodes.get(0).getAddress().getHost());
assertEquals(9301, nodes.get(0).getAddress().getPort());
}
// sets up the config dir, writes to the unicast hosts file in the config dir,
// and then runs the file-based unicast host provider to get the list of discovery nodes
private List<DiscoveryNode> setupAndRunHostProvider(final List<String> hostEntries) throws IOException {
final Path homeDir = createTempDir();
final Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), homeDir)
.build();
final Path configDir = homeDir.resolve("config").resolve("discovery-file");
Files.createDirectories(configDir);
final Path unicastHostsPath = configDir.resolve(UNICAST_HOSTS_FILE);
try (BufferedWriter writer = Files.newBufferedWriter(unicastHostsPath)) {
writer.write(String.join("\n", hostEntries));
}
return new FileBasedUnicastHostsProvider(settings, transportService).buildDynamicNodes();
}
}

View File

@ -0,0 +1,13 @@
# Integration tests for file-based discovery
#
"Discovery File loaded":
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.info: {}
- match: { nodes.$master.plugins.0.name: discovery-file }

View File

@ -217,6 +217,10 @@ fi
install_and_check_plugin discovery ec2 aws-java-sdk-core-*.jar
}
@test "[$GROUP] install discovery-file plugin" {
install_and_check_plugin discovery file
}
@test "[$GROUP] install ingest-attachment plugin" {
# we specify the version on the poi-3.15-beta1.jar so that the test does
# not spuriously pass if the jar is missing but the other poi jars
@ -353,6 +357,10 @@ fi
remove_plugin discovery-ec2
}
@test "[$GROUP] remove discovery-file plugin" {
remove_plugin discovery-file
}
@test "[$GROUP] remove ingest-attachment plugin" {
remove_plugin ingest-attachment
}

View File

@ -38,6 +38,7 @@ List projects = [
'plugins:analysis-stempel',
'plugins:discovery-azure-classic',
'plugins:discovery-ec2',
'plugins:discovery-file',
'plugins:discovery-gce',
'plugins:ingest-geoip',
'plugins:ingest-attachment',