diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index 1ebfcc0bfd1..0dda9ed2ef6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -236,6 +236,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo connectHandler.close(); } + public boolean isClosed() { + return connectHandler.isClosed(); + } + /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -350,6 +354,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo void collectRemoteNodes(Iterator seedNodes, final TransportService transportService, ActionListener listener) { + if (Thread.currentThread().isInterrupted()) { + listener.onFailure(new InterruptedException("remote connect thread got interrupted")); + } try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { @@ -409,6 +416,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } } + final boolean isClosed() { + return closed.get(); + } + /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */ private class SniffClusterStateResponseHandler implements TransportResponseHandler { diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java index b25a5f52277..26412903981 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java @@ -59,6 +59,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Basic service for accessing remote clusters via gateway nodes @@ -68,11 +70,9 @@ public final class RemoteClusterService extends AbstractComponent implements Clo /** * A list of initial seed nodes to discover eligible nodes from the remote cluster */ - //TODO this should be an affix settings? - public static final Setting REMOTE_CLUSTERS_SEEDS = Setting.groupSetting("search.remote.seeds.", - RemoteClusterService::validateRemoteClustersSeeds, - Setting.Property.NodeScope, - Setting.Property.Dynamic); + public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", + "seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterService::parseSeedAddress, + Setting.Property.NodeScope, Setting.Property.Dynamic)); /** * The maximum number of connections that will be established to a remote cluster. For instance if there is only a single * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. @@ -109,12 +109,11 @@ public final class RemoteClusterService extends AbstractComponent implements Clo /** * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure - * @param seedSettings the group settings returned from {@link #REMOTE_CLUSTERS_SEEDS} + * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes * @param connectionListener a listener invoked once every configured cluster has been connected to */ - void updateRemoteClusters(Settings seedSettings, ActionListener connectionListener) { + private synchronized void updateRemoteClusters(Map> seeds, ActionListener connectionListener) { Map remoteClusters = new HashMap<>(); - Map> seeds = buildRemoteClustersSeeds(seedSettings); if (seeds.isEmpty()) { connectionListener.onResponse(null); } else { @@ -126,13 +125,27 @@ public final class RemoteClusterService extends AbstractComponent implements Clo String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false"))); } + remoteClusters.putAll(this.remoteClusters); for (Map.Entry> entry : seeds.entrySet()) { RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); - if (remote == null) { + if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), e); + } + remoteClusters.remove(entry.getKey()); + continue; + } + + if (remote == null) { // this is a new cluster we have to add a new representation remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, nodePredicate); remoteClusters.put(entry.getKey(), remote); } + + // now update the seed nodes no matter if it's new or already existing + RemoteClusterConnection finalRemote = remote; remote.updateSeedNodes(entry.getValue(), ActionListener.wrap( response -> { if (countDown.countDown()) { @@ -143,14 +156,13 @@ public final class RemoteClusterService extends AbstractComponent implements Clo if (countDown.fastForward()) { connectionListener.onFailure(exception); } - logger.error("failed to update seed list for cluster: " + entry.getKey(), exception); + if (finalRemote.isClosed() == false) { + logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception); + } })); } } - if (remoteClusters.isEmpty() == false) { - remoteClusters.putAll(this.remoteClusters); - this.remoteClusters = Collections.unmodifiableMap(remoteClusters); - } + this.remoteClusters = Collections.unmodifiableMap(remoteClusters); } /** @@ -296,65 +308,55 @@ public final class RemoteClusterService extends AbstractComponent implements Clo return connection.getConnection(node); } - - static Map> buildRemoteClustersSeeds(Settings settings) { - Map> remoteClustersNodes = new HashMap<>(); - for (String clusterName : settings.names()) { - String[] remoteHosts = settings.getAsArray(clusterName); - for (String remoteHost : remoteHosts) { - int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 - String host = remoteHost.substring(0, portSeparator); - InetAddress hostAddress; - try { - hostAddress = InetAddress.getByName(host); - } catch (UnknownHostException e) { - throw new IllegalArgumentException("unknown host [" + host + "]", e); - } - int port = Integer.valueOf(remoteHost.substring(portSeparator + 1)); - DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost, - new TransportAddress(new InetSocketAddress(hostAddress, port)), + public void updateRemoteCluster(String clusterAlias, List addresses) { + updateRemoteClusters(Collections.singletonMap(clusterAlias, addresses.stream().map(address -> { + TransportAddress transportAddress = new TransportAddress(address); + return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), + transportAddress, Version.CURRENT.minimumCompatibilityVersion()); - List nodes = remoteClustersNodes.get(clusterName); - if (nodes == null) { - nodes = new ArrayList<>(); - remoteClustersNodes.put(clusterName, nodes); - } - nodes.add(node); - } - } - return remoteClustersNodes; + }).collect(Collectors.toList())), + ActionListener.wrap((x) -> {}, (x) -> {}) ); } - static void validateRemoteClustersSeeds(Settings settings) { - for (String clusterName : settings.names()) { - String[] remoteHosts = settings.getAsArray(clusterName); - if (remoteHosts.length == 0) { - throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required"); - } - for (String remoteHost : remoteHosts) { - int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 - if (portSeparator == -1 || portSeparator == remoteHost.length()) { - throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " + - "instead for remote cluster [" + clusterName + "]"); - } - String host = remoteHost.substring(0, portSeparator); - try { - InetAddress.getByName(host); - } catch (UnknownHostException e) { - throw new IllegalArgumentException("unknown host [" + host + "]", e); - } - String port = remoteHost.substring(portSeparator + 1); - try { - Integer portValue = Integer.valueOf(port); - if (portValue <= 0) { - throw new IllegalArgumentException("port number must be > 0 but was: [" + portValue + "]"); - } - } catch (NumberFormatException e) { - throw new IllegalArgumentException("port must be a number, found [" + port + "] instead for remote cluster [" + - clusterName + "]"); - } + static Map> buildRemoteClustersSeeds(Settings settings) { + Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); + return allConcreteSettings.collect( + Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { + String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); + List nodes = new ArrayList<>(); + for (InetSocketAddress address : concreteSetting.get(settings)) { + TransportAddress transportAddress = new TransportAddress(address); + DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + nodes.add(node); } + return nodes; + })); + } + + static final InetSocketAddress parseSeedAddress(String remoteHost) { + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + if (portSeparator == -1 || portSeparator == remoteHost.length()) { + throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); } + String host = remoteHost.substring(0, portSeparator); + InetAddress hostAddress; + try { + hostAddress = InetAddress.getByName(host); + } catch (UnknownHostException e) { + throw new IllegalArgumentException("unknown host [" + host + "]", e); + } + try { + int port = Integer.valueOf(remoteHost.substring(portSeparator + 1)); + if (port <= 0) { + throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]"); + } + return new InetSocketAddress(hostAddress, port); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("port must be a number"); + } + } /** @@ -364,7 +366,8 @@ public final class RemoteClusterService extends AbstractComponent implements Clo void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - updateRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings), future); + Map> seeds = buildRemoteClustersSeeds(settings); + updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index f4ccd837ac6..2eb6633b1f7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -83,10 +83,8 @@ public class SearchTransportService extends AbstractLifecycleComponent { super(settings); this.transportService = transportService; this.remoteClusterService = new RemoteClusterService(settings, transportService); - final Consumer clusterUpdateConsumer = (s) -> remoteClusterService.updateRemoteClusters(s, - ActionListener.wrap((x) -> {}, (x) -> {})); - clusterSettings.addSettingsUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, clusterUpdateConsumer, - RemoteClusterService::validateRemoteClustersSeeds); + clusterSettings.addAffixUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, remoteClusterService::updateRemoteCluster, + (namespace, value) -> {}); } public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) { diff --git a/core/src/main/java/org/elasticsearch/common/settings/Setting.java b/core/src/main/java/org/elasticsearch/common/settings/Setting.java index d3bc4ebaf0b..5d592b8c452 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/core/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -480,7 +481,7 @@ public class Setting extends ToXContentToBytes { public Map, T> getValue(Settings current, Settings previous) { // we collect all concrete keys and then delegate to the actual setting for validation and settings extraction final Map, T> result = new IdentityHashMap<>(); - Stream.concat(matchStream(current), matchStream(previous)).forEach(aKey -> { + Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); AbstractScopedSettings.SettingUpdater updater = getConcreteSetting(aKey).newUpdater((v) -> consumer.accept(namespace, v), logger, @@ -504,6 +505,18 @@ public class Setting extends ToXContentToBytes { }; } + @Override + public T get(Settings settings) { + throw new UnsupportedOperationException("affix settings can't return values" + + " use #getConcreteSetting to obtain a concrete setting"); + } + + @Override + public String getRaw(Settings settings) { + throw new UnsupportedOperationException("affix settings can't return values" + + " use #getConcreteSetting to obtain a concrete setting"); + } + @Override public Setting getConcreteSetting(String key) { if (match(key)) { @@ -517,6 +530,22 @@ public class Setting extends ToXContentToBytes { public void diff(Settings.Builder builder, Settings source, Settings defaultSettings) { matchStream(defaultSettings).forEach((key) -> getConcreteSetting(key).diff(builder, source, defaultSettings)); } + + /** + * Returns the namespace for a concrete settting. Ie. an affix setting with prefix: search. and suffix: username + * will return remote as a namespace for the setting search.remote.username + */ + public String getNamespace(Setting concreteSetting) { + return key.getNamespace(concreteSetting.getKey()); + } + + /** + * Returns a stream of all concrete setting instances for the given settings. AffixSetting is only a specification, concrete + * settings depend on an actual set of setting keys. + */ + public Stream> getAllConcreteSettings(Settings settings) { + return matchStream(settings).distinct().map(this::getConcreteSetting); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index c2f0832b75e..8be107b1919 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -477,8 +477,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException { + boolean success = false; + NodeChannels nodeChannels = null; try { - NodeChannels nodeChannels = connectToChannels(node, connectionProfile); + nodeChannels = connectToChannels(node, connectionProfile); final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ? defaultConnectionProfile.getConnectTimeout() : @@ -487,13 +489,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i connectTimeout : connectionProfile.getHandshakeTimeout(); final Version version = executeHandshake(node, channel, handshakeTimeout); transportServiceAdapter.onConnectionOpened(node); - return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version + nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version + success = true; + return nodeChannels; } catch (ConnectTransportException e) { throw e; } catch (Exception e) { // ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure // only relevant exceptions are logged on the caller end.. this is the same as in connectToNode throw new ConnectTransportException(node, "general node connection failure", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(nodeChannels); + } } } diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java index 68136bfe376..9650b0821dc 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java @@ -85,20 +85,20 @@ public class RemoteClusterServiceTests extends ESTestCase { public void testRemoteClusterSeedSetting() { // simple validation - RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() - .put("search.remote.seeds.foo", "192.168.0.1:8080") - .put("search.remote.seeds.bar", "[::1]:9090").build()); + Settings settings = Settings.builder() + .put("search.remote.foo.seeds", "192.168.0.1:8080") + .put("search.remote.bar.seed", "[::1]:9090").build(); + RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings)); + Settings brokenSettings = Settings.builder() + .put("search.remote.foo.seeds", "192.168.0.1").build(); expectThrows(IllegalArgumentException.class, () -> - RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() - .put("search.remote.seeds.foo", "192.168.0.1").build())); + RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings))); } public void testBuiltRemoteClustersSeeds() throws Exception { Map> map = RemoteClusterService.buildRemoteClustersSeeds( - RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() - .put("search.remote.seeds.foo", "192.168.0.1:8080") - .put("search.remote.seeds.bar", "[::1]:9090").build())); + Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); assertEquals(2, map.size()); assertTrue(map.containsKey("foo")); assertTrue(map.containsKey("bar")); @@ -133,8 +133,8 @@ public class RemoteClusterServiceTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); - builder.putArray("search.remote.seeds.cluster_1", seedNode.getAddress().toString()); - builder.putArray("search.remote.seeds.cluster_2", otherSeedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); @@ -154,6 +154,41 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testIncrementallyAddClusters() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + service.updateRemoteCluster("cluster_2", Collections.emptyList()); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + } + } + } + } + public void testProcessRemoteShards() throws IOException { try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) { assertFalse(service.isCrossClusterSearchEnabled()); diff --git a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 4ce23ebcaf0..3789ea40459 100644 --- a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -475,6 +477,33 @@ public class SettingTests extends ESTestCase { assertFalse(listAffixSetting.match("foo")); } + public void testGetAllConcreteSettings() { + Setting.AffixSetting> listAffixSetting = Setting.affixKeySetting("foo.", "bar", + (key) -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope)); + + Settings settings = Settings.builder() + .putArray("foo.1.bar", "1", "2") + .putArray("foo.2.bar", "3", "4", "5") + .putArray("foo.bar", "6") + .putArray("some.other", "6") + .putArray("foo.3.bar", "6") + .build(); + Stream>> allConcreteSettings = listAffixSetting.getAllConcreteSettings(settings); + Map> collect = allConcreteSettings.collect(Collectors.toMap(Setting::getKey, (s) -> s.get(settings))); + assertEquals(3, collect.size()); + assertEquals(Arrays.asList("1", "2"), collect.get("foo.1.bar")); + assertEquals(Arrays.asList("3", "4", "5"), collect.get("foo.2.bar")); + assertEquals(Arrays.asList("6"), collect.get("foo.3.bar")); + } + + public void testAffixSettingsFailOnGet() { + Setting.AffixSetting> listAffixSetting = Setting.affixKeySetting("foo.", "bar", + (key) -> Setting.listSetting(key, Collections.singletonList("testelement"), Function.identity(), Property.NodeScope)); + expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.get(Settings.EMPTY)); + expectThrows(UnsupportedOperationException.class, () -> listAffixSetting.getRaw(Settings.EMPTY)); + assertEquals(Collections.singletonList("testelement"), listAffixSetting.getDefault(Settings.EMPTY)); + assertEquals("[\"testelement\"]", listAffixSetting.getDefaultRaw(Settings.EMPTY)); + } public void testMinMaxInt() { Setting integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, Property.NodeScope); diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index ce2bac98dd5..18bf597f596 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -24,9 +24,10 @@ remote clusters that should be connected to, for instance: -------------------------------- search: remote: - seeds: - cluster_one: 127.0.0.1:9300 <1> - cluster_two: 127.0.0.1:9301 <1> + cluster_one: <1> + seeds: 127.0.0.1:9300 + cluster_two: <1> + seeds: 127.0.0.1:9301 -------------------------------- <1> `cluster_one` and `cluster_two` are arbitrary names representing the connection to each cluster. These names are subsequently used to distinguish between local and remote indices. diff --git a/qa/multi-cluster-search/build.gradle b/qa/multi-cluster-search/build.gradle index 1abe6e984c3..b48cd9a3dd1 100644 --- a/qa/multi-cluster-search/build.gradle +++ b/qa/multi-cluster-search/build.gradle @@ -35,7 +35,7 @@ task mixedClusterTest(type: RestIntegTestTask) { dependsOn(remoteClusterTest) cluster { distribution = 'zip' - setting 'search.remote.seeds.my_remote_cluster', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\"" + setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\"" setting 'search.remote.connections_per_cluster', 1 } diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml index 4ec7747bfee..31b8fbd251e 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml @@ -99,16 +99,16 @@ cluster.get_settings: include_defaults: true - - set: { defaults.search.remote.seeds.my_remote_cluster: remote_ip } + - set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip } - do: cluster.put_settings: flat_settings: true body: transient: - search.remote.seeds.test_remote_cluster: $remote_ip + search.remote.test_remote_cluster.seeds: $remote_ip - - match: {transient: {search.remote.seeds.test_remote_cluster: $remote_ip}} + - match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}} - do: search: