diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy index ca4957f7a6c..2c5159bc2d8 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy @@ -96,6 +96,15 @@ class ClusterConfiguration { return seedNode.transportUri() } + /** + * A closure to call which returns a map of settings. + * + * This can be used to pass settings to a cluster that are not available at evaluation time ie. + * the address of a remote cluster etc. + */ + @Input + Closure dynamicSettings = { -> Collections.emptyMap() } + /** * A closure to call before the cluster is considered ready. The closure is passed the node info, * as well as a groovy AntBuilder, to enable running ant condition checks. The default wait diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 73d7a3c7cd9..b911a7b5ea8 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -294,6 +294,8 @@ class ClusterFormationTasks { if (unicastTransportUri != null) { esConfig['discovery.zen.ping.unicast.hosts'] = "\"${unicastTransportUri}\"" } + Map dynamicSettings = node.config.dynamicSettings(); + esConfig.putAll(dynamicSettings) File configFile = new File(node.confDir, 'elasticsearch.yml') logger.info("Configuring ${configFile}") configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8') diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 892567d096b..65254a75193 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -115,31 +115,35 @@ public class SearchTransportService extends AbstractComponent { throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required"); } for (String remoteHost : remoteHosts) { - String[] strings = remoteHost.split(":"); - if (strings.length != 2) { + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + if (portSeparator == -1 || portSeparator == remoteHost.length()) { throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " + - "instead for remote cluster [" + clusterName + "]"); + "instead for remote cluster [" + clusterName + "]"); } + String port = remoteHost.substring(portSeparator+1); try { - Integer.valueOf(strings[1]); + Integer portValue = Integer.valueOf(port); + if (portValue <= 0) { + throw new IllegalArgumentException("port number must be > 0 but was: [" + portValue + "]"); + } } catch(NumberFormatException e) { - throw new IllegalArgumentException("port must be a number, found [" + strings[1] + "] instead for remote cluster [" + + throw new IllegalArgumentException("port must be a number, found [" + port + "] instead for remote cluster [" + clusterName + "]"); } } } } - private void setRemoteClustersSeeds(Settings settings) { + static Map> builtRemoteClustersSeeds(Settings settings) { Map> remoteClustersNodes = new HashMap<>(); for (String clusterName : settings.names()) { String[] remoteHosts = settings.getAsArray(clusterName); for (String remoteHost : remoteHosts) { - String[] strings = remoteHost.split(":"); - String host = strings[0]; - int port = Integer.valueOf(strings[1]); + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + String host = remoteHost.substring(0, portSeparator); + int port = Integer.valueOf(remoteHost.substring(portSeparator+1)); DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost, - new TransportAddress(new InetSocketAddress(host, port)), Version.CURRENT.minimumCompatibilityVersion()); + new TransportAddress(new InetSocketAddress(host, port)), Version.CURRENT.minimumCompatibilityVersion()); //don't connect yet as that would require the remote node to be up and would fail the local node startup otherwise List nodes = remoteClustersNodes.get(clusterName); if (nodes == null) { @@ -149,7 +153,11 @@ public class SearchTransportService extends AbstractComponent { nodes.add(node); } } - remoteClustersSeeds = remoteClustersNodes; + return remoteClustersNodes; + } + + private void setRemoteClustersSeeds(Settings settings) { + remoteClustersSeeds = builtRemoteClustersSeeds(settings); } private DiscoveryNode connectToRemoteCluster(String clusterName) { diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index c56c7bac94e..fb767dc4dbf 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -137,38 +137,35 @@ public class TransportSearchAction extends HandledTransportAction>() { - @Override - public void onResponse(Map searchShardsResponses) { - List remoteShardIterators = new ArrayList<>(); - Set remoteNodes = new HashSet<>(); - Set remoteUUIDs = new HashSet<>(); - for (Map.Entry entry : searchShardsResponses.entrySet()) { - String clusterName = entry.getKey(); - ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); - Collections.addAll(remoteNodes, searchShardsResponse.getNodes()); - for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - //add the cluster name to the remote index names for indices disambiguation - //this ends up in the hits returned with the search response - Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + - clusterSearchShardsGroup.getShardId().getIndex().getName(), - clusterSearchShardsGroup.getShardId().getIndex().getUUID()); - ShardId shardId = new ShardId(index, clusterSearchShardsGroup.getShardId().getId()); - ShardIterator shardIterator = new PlainShardIterator(shardId, - Arrays.asList(clusterSearchShardsGroup.getShards())); - remoteShardIterators.add(shardIterator); - remoteUUIDs.add(clusterSearchShardsGroup.getShardId().getIndex().getUUID()); - } - } - executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, - remoteUUIDs.toArray(new String[remoteUUIDs.size()]), remoteShardIterators, remoteNodes, listener); - } + ActionListener.wrap((searchShardsResponses) -> { + List remoteShardIterators = new ArrayList<>(); + Set remoteNodes = new HashSet<>(); + Set remoteUUIDs = new HashSet<>(); + processRemoteShards(searchShardsResponses, remoteShardIterators, remoteNodes, remoteUUIDs); + executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, + remoteUUIDs.toArray(new String[remoteUUIDs.size()]), remoteShardIterators, remoteNodes, listener); + }, listener::onFailure)); + } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + private void processRemoteShards(Map searchShardsResponses, + List remoteShardIterators, Set remoteNodes, Set remoteUUIDs) { + for (Map.Entry entry : searchShardsResponses.entrySet()) { + String clusterName = entry.getKey(); + ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); + Collections.addAll(remoteNodes, searchShardsResponse.getNodes()); + for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { + //add the cluster name to the remote index names for indices disambiguation + //this ends up in the hits returned with the search response + Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + + clusterSearchShardsGroup.getShardId().getIndex().getName(), + clusterSearchShardsGroup.getShardId().getIndex().getUUID()); + ShardId shardId = new ShardId(index, clusterSearchShardsGroup.getShardId().getId()); + ShardIterator shardIterator = new PlainShardIterator(shardId, + Arrays.asList(clusterSearchShardsGroup.getShards())); + remoteShardIterators.add(shardIterator); + remoteUUIDs.add(clusterSearchShardsGroup.getShardId().getIndex().getUUID()); + } } } @@ -181,8 +178,13 @@ public class TransportSearchAction extends HandledTransportAction 0) { + indices = new Index[0]; // don't search on ALL if nothing is specified + } else { + indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), + startTimeInMillis, localIndices); + } Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteUUIDs); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchTransportServiceTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchTransportServiceTests.java new file mode 100644 index 00000000000..81c26357543 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/SearchTransportServiceTests.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +public class SearchTransportServiceTests extends ESTestCase { + + public void testRemoteClusterSeedSetting() { + // simple validation + SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() + .put("action.search.remote.foo", "192.168.0.1:8080") + .put("action.search.remote.bar", "[::1]:9090").build()); + + expectThrows(IllegalArgumentException.class, () -> + SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() + .put("action.search.remote.foo", "192.168.0.1").build())); + } + + public void testBuiltRemoteClustersSeeds() { + Map> map = SearchTransportService.builtRemoteClustersSeeds( + SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder() + .put("action.search.remote.foo", "192.168.0.1:8080") + .put("action.search.remote.bar", "[::1]:9090").build())); + assertEquals(2, map.size()); + assertTrue(map.containsKey("foo")); + assertTrue(map.containsKey("bar")); + assertEquals(1, map.get("foo").size()); + assertEquals(1, map.get("bar").size()); + + DiscoveryNode foo = map.get("foo").get(0); + assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress("192.168.0.1", 8080))); + assertEquals(foo.getId(), "foo#192.168.0.1:8080"); + assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + + DiscoveryNode bar = map.get("bar").get(0); + assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress("[::1]", 9090))); + assertEquals(bar.getId(), "bar#[::1]:9090"); + assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + } +} diff --git a/qa/multi-cluster-search/build.gradle b/qa/multi-cluster-search/build.gradle new file mode 100644 index 00000000000..7721ed660f0 --- /dev/null +++ b/qa/multi-cluster-search/build.gradle @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +task remoteClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) + cluster { + distribution = 'zip' + numNodes = 1 + clusterName = 'remote-cluster' + } + systemProperty 'tests.rest.suite', 'remote_cluster' +} + +task mixedClusterTest(type: RestIntegTestTask) { + dependsOn(remoteClusterTest) + cluster { + distribution = 'zip' + dynamicSettings = { -> + Collections.singletonMap("action.search.remote.my_remote_cluster", + "\"${remoteClusterTest.nodes.get(0).transportUri()}\"") + } + } + systemProperty 'tests.rest.suite', 'multi_cluster' + finalizedBy 'remoteClusterTest#stop' +} + +task integTest { + dependsOn = [mixedClusterTest] +} + +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test + +check.dependsOn(integTest) diff --git a/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java b/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java new file mode 100644 index 00000000000..ee140da23e7 --- /dev/null +++ b/qa/multi-cluster-search/src/test/java/org/elasticsearch/upgrades/MultiClusterSearchYamlTestSuiteIT.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.upgrades; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; +import org.apache.lucene.util.TimeUnits; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.test.rest.yaml.parser.ClientYamlTestParseException; + +import java.io.IOException; + +@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs +public class MultiClusterSearchYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + public MultiClusterSearchYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, ClientYamlTestParseException { + return createParameters(); + } +} + diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml new file mode 100644 index 00000000000..a6acdf7b98f --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yaml @@ -0,0 +1,47 @@ +--- +"Index data and search on the mixed cluster": + + - do: + indices.create: + index: test_index_1 + body: + settings: + index: + number_of_replicas: 0 + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index_1", "_type": "test_type"}}' + - '{"f1": "v1_old", "f2": 0}' + - '{"index": {"_index": "test_index_1", "_type": "test_type"}}' + - '{"f1": "v2_old", "f2": 1}' + - '{"index": {"_index": "test_index_1", "_type": "test_type"}}' + - '{"f1": "v3_old", "f2": 2}' + - '{"index": {"_index": "test_index_1", "_type": "test_type"}}' + - '{"f1": "v4_old", "f2": 3}' + - '{"index": {"_index": "test_index_1", "_type": "test_type"}}' + - '{"f1": "v5_old", "f2": 4}' + + - do: + indices.flush: + index: test_index_1 + + - do: + search: + index: test_index_1,my_remote_cluster|test_index + + - match: { hits.total: 10 } + + - do: + search: + index: my_remote_cluster|test_index + + - match: { hits.total: 5} + + - do: + search: + index: test_index_1 + + - match: { hits.total: 5} diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml new file mode 100644 index 00000000000..5f55407d6a4 --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yaml @@ -0,0 +1,35 @@ +--- +"Index data and search on the old cluster": + + - do: + indices.create: + index: test_index + body: + settings: + index: + number_of_replicas: 0 + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "v1_old", "f2": 0}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "v2_old", "f2": 1}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "v3_old", "f2": 2}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "v4_old", "f2": 3}' + - '{"index": {"_index": "test_index", "_type": "test_type"}}' + - '{"f1": "v5_old", "f2": 4}' + + - do: + indices.flush: + index: test_index + + - do: + search: + index: test_index + + - match: { hits.total: 5 } diff --git a/settings.gradle b/settings.gradle index f3fb14674eb..65f47fb84c7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -54,6 +54,7 @@ List projects = [ 'qa:evil-tests', 'qa:no-bootstrap-tests', 'qa:rolling-upgrade', + 'qa:multi-cluster-search', 'qa:smoke-test-client', 'qa:smoke-test-http', 'qa:smoke-test-ingest-with-all-dependencies',