From aafb2cb8126a674a103930b75be31797fca0b09b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 9 Sep 2020 20:55:44 -0400 Subject: [PATCH] Support point in time cross cluster search (#61827) This commit integrates point in time into cross cluster search. Relates #61062 Closes #61790 --- .../action/search/TransportSearchAction.java | 151 ++++++++----- .../rest/action/search/RestSearchAction.java | 13 +- .../search/TransportSearchActionTests.java | 7 +- .../test/AbstractMultiClustersTestCase.java | 199 ++++++++++++++++++ .../xpack/core/search/CCSPointInTimeIT.java | 107 ++++++++++ .../action/OpenPointInTimeResponse.java | 3 +- .../TransportOpenPointInTimeAction.java | 6 +- .../build.gradle | 2 +- .../test/multi_cluster/100_resolve_index.yml | 14 +- .../test/multi_cluster/80_point_in_time.yml | 113 ++++++++++ .../test/remote_cluster/10_basic.yml | 31 ++- 11 files changed, 571 insertions(+), 75 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java create mode 100644 x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java create mode 100644 x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b15d4d1fdb0..aab586fa47e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -41,9 +41,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; @@ -310,10 +310,19 @@ public class TransportSearchAction extends HandledTransportAction { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards( - searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + final BiFunction clusterNodeLookup = + getRemoteClusterNodeLookup(searchShardsResponses); + final Map remoteAliasFilters; + final List remoteShardIterators; + if (searchContext != null) { + remoteAliasFilters = searchContext.aliasFilter(); + remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses, + searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices); + } else { + remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses); + remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices, + remoteAliasFilters); + } int localClusters = localIndices == null ? 0 : 1; int totalClusters = remoteClusterIndices.size() + localClusters; int successfulClusters = searchShardsResponses.size() + localClusters; @@ -499,23 +508,29 @@ public class TransportSearchAction extends HandledTransportAction processRemoteShards(Map searchShardsResponses, - Map remoteIndicesByCluster, - List remoteShardIterators, - Map aliasFilterMap) { + static BiFunction getRemoteClusterNodeLookup(Map searchShardsResp) { Map> clusterToNode = new HashMap<>(); - for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (Map.Entry entry : searchShardsResp.entrySet()) { String clusterAlias = entry.getKey(); - ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); - HashMap idToDiscoveryNode = new HashMap<>(); - clusterToNode.put(clusterAlias, idToDiscoveryNode); - for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { - idToDiscoveryNode.put(remoteNode.getId(), remoteNode); + for (DiscoveryNode remoteNode : entry.getValue().getNodes()) { + clusterToNode.computeIfAbsent(clusterAlias, k -> new HashMap<>()).put(remoteNode.getId(), remoteNode); } + } + return (clusterAlias, nodeId) -> { + Map clusterNodes = clusterToNode.get(clusterAlias); + if (clusterNodes == null) { + throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias); + } + return clusterNodes.get(nodeId); + }; + } + + static Map getRemoteAliasFilters(Map searchShardsResp) { + final Map aliasFilterMap = new HashMap<>(); + for (Map.Entry entry : searchShardsResp.entrySet()) { + ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); final Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - //add the cluster name to the remote index names for indices disambiguation - //this ends up in the hits returned with the search response ShardId shardId = clusterSearchShardsGroup.getShardId(); final AliasFilter aliasFilter; if (indicesAndFilters == null) { @@ -524,10 +539,26 @@ public class TransportSearchAction extends HandledTransportAction getRemoteShardsIterator(Map searchShardsResponses, + Map remoteIndicesByCluster, + Map aliasFilterMap) { + final List remoteShardIterators = new ArrayList<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (ClusterSearchShardsGroup clusterSearchShardsGroup : entry.getValue().getGroups()) { + //add the cluster name to the remote index names for indices disambiguation + //this ends up in the hits returned with the search response + ShardId shardId = clusterSearchShardsGroup.getShardId(); + AliasFilter aliasFilter = aliasFilterMap.get(shardId.getIndex().getUUID()); + String[] aliases = aliasFilter.getAliases(); + String clusterAlias = entry.getKey(); + String[] finalIndices = aliases.length == 0 ? new String[]{shardId.getIndexName()} : aliases; final OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias); assert originalIndices != null : "original indices are null for clusterAlias: " + clusterAlias; SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, @@ -536,13 +567,27 @@ public class TransportSearchAction extends HandledTransportAction { - Map clusterNodes = clusterToNode.get(clusterAlias); - if (clusterNodes == null) { - throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias); - } - return clusterNodes.get(nodeId); - }; + return remoteShardIterators; + } + + static List getRemoteShardsIteratorFromPointInTime(Map searchShardsResponses, + SearchContextId searchContextId, + TimeValue searchContextKeepAlive, + Map remoteClusterIndices) { + final List remoteShardIterators = new ArrayList<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + for (ClusterSearchShardsGroup group : entry.getValue().getGroups()) { + final ShardId shardId = group.getShardId(); + final String clusterAlias = entry.getKey(); + final SearchContextIdForNode perNode = searchContextId.shards().get(shardId); + assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias(); + final List targetNodes = Collections.singletonList(perNode.getNode()); + SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes, + remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive); + remoteShardIterators.add(shardIterator); + } + } + return remoteShardIterators; } private Index[] resolveLocalIndices(OriginalIndices localIndices, @@ -569,39 +614,34 @@ public class TransportSearchAction extends HandledTransportAction localShardIterators; final Map aliasFilter; final Map> indexRoutings; - final Executor asyncSearchExecutor; - boolean preFilterSearchShards; + final String[] concreteLocalIndices; if (searchContext != null) { assert searchRequest.pointInTimeBuilder() != null; aliasFilter = searchContext.aliasFilter(); indexRoutings = Collections.emptyMap(); - asyncSearchExecutor = asyncSearchExecutor(localIndices.indices(), clusterState); - localShardIterators = getSearchShardsFromSearchContexts(clusterState, localIndices, searchRequest.getLocalClusterAlias(), - searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); - preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, localIndices.indices(), - localShardIterators.size() + remoteShardIterators.size()); + concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices(); + localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices, + searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); } else { final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); - final String[] concreteIndices = new String[indices.length]; + concreteLocalIndices = new String[indices.length]; for (int i = 0; i < indices.length; i++) { - concreteIndices[i] = indices[i].getName(); + concreteLocalIndices[i] = indices[i].getName(); } - asyncSearchExecutor = asyncSearchExecutor(concreteIndices, clusterState); Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardRoutings = clusterService.operationRouting().searchShards(clusterState, - concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); + concreteLocalIndices, routingMap, searchRequest.preference(), + searchService.getResponseCollectorService(), nodeSearchCounts); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator( searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) .collect(Collectors.toList()); aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); indexRoutings = routingMap; - preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteIndices, - localShardIterators.size() + remoteShardIterators.size()); } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); @@ -631,6 +671,9 @@ public class TransportSearchAction extends HandledTransportAction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(), nodes::get, remoteConnections, searchTransportService::getConnection); + final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState); + final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices, + localShardIterators.size() + remoteShardIterators.size()); searchAsyncActionProvider.asyncSearchAction( task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState, Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener, @@ -874,27 +917,21 @@ public class TransportSearchAction extends HandledTransportAction new OriginalIndices(e.getValue().toArray(new String[0]), indicesOptions))); } - static List getSearchShardsFromSearchContexts(ClusterState clusterState, - OriginalIndices originalIndices, - String localClusterAlias, - SearchContextId searchContext, - TimeValue keepAlive) { + static List getLocalLocalShardsIteratorFromPointInTime(ClusterState clusterState, + OriginalIndices originalIndices, + String localClusterAlias, + SearchContextId searchContext, + TimeValue keepAlive) { final List iterators = new ArrayList<>(searchContext.shards().size()); for (Map.Entry entry : searchContext.shards().entrySet()) { - final ShardId shardId = entry.getKey(); - final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); - final List matchingNodeFirst = new ArrayList<>(shards.size()); - final String nodeId = entry.getValue().getNode(); - // always search the matching node first even when its shard was relocated to another node - // because the point in time should keep the corresponding search context open. - matchingNodeFirst.add(nodeId); - for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(nodeId) == false) { - matchingNodeFirst.add(shard.currentNodeId()); - } + final SearchContextIdForNode perNode = entry.getValue(); + if (Strings.isEmpty(perNode.getClusterAlias())) { + final ShardId shardId = entry.getKey(); + OperationRouting.getShards(clusterState, shardId); + final List targetNodes = Collections.singletonList(perNode.getNode()); + iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices, + perNode.getSearchContextId(), keepAlive)); } - iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices, - entry.getValue().getSearchContextId(), keepAlive)); } return iterators; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 6f3ee79f42b..5d5261d9a90 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -186,12 +186,14 @@ public class RestSearchAction extends BaseRestHandler { searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); - searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); checkRestTotalHits(request, searchRequest); if (searchRequest.pointInTimeBuilder() != null) { - preparePointInTime(searchRequest, namedWriteableRegistry); + preparePointInTime(searchRequest, request, namedWriteableRegistry); + } else { + searchRequest.setCcsMinimizeRoundtrips( + request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); } } @@ -308,7 +310,7 @@ public class RestSearchAction extends BaseRestHandler { } } - static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) { + static void preparePointInTime(SearchRequest request, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry) { assert request.pointInTimeBuilder() != null; ActionRequestValidationException validationException = null; if (request.indices().length > 0) { @@ -323,6 +325,11 @@ public class RestSearchAction extends BaseRestHandler { if (request.preference() != null) { validationException = addValidationError("[preference] cannot be used with point in time", validationException); } + if (restRequest.paramAsBoolean("ccs_minimize_roundtrips", false)) { + validationException = + addValidationError("[ccs_minimize_roundtrips] cannot be used with point in time", validationException); + request.setCcsMinimizeRoundtrips(false); + } ExceptionsHelper.reThrowIfNotNull(validationException); final IndicesOptions indicesOptions = request.indicesOptions(); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index f4c5b0a35d5..880c609d74d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -208,7 +208,6 @@ public class TransportSearchActionTests extends ESTestCase { null)) { RemoteClusterService service = transportService.getRemoteClusterService(); assertFalse(service.isCrossClusterSearchEnabled()); - List iteratorList = new ArrayList<>(); Map searchShardsResponseMap = new HashMap<>(); DiscoveryNode[] nodes = new DiscoveryNode[] { new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), @@ -246,9 +245,9 @@ public class TransportSearchActionTests extends ESTestCase { new OriginalIndices(new String[]{"fo*", "ba*"}, SearchRequest.DEFAULT_INDICES_OPTIONS)); remoteIndicesByCluster.put("test_cluster_2", new OriginalIndices(new String[]{"x*"}, SearchRequest.DEFAULT_INDICES_OPTIONS)); - Map remoteAliases = new HashMap<>(); - TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, - remoteAliases); + Map remoteAliases = TransportSearchAction.getRemoteAliasFilters(searchShardsResponseMap); + List iteratorList = + TransportSearchAction.getRemoteShardsIterator(searchShardsResponseMap, remoteIndicesByCluster, remoteAliases); assertEquals(4, iteratorList.size()); for (SearchShardIterator iterator : iteratorList) { if (iterator.shardId().getIndexName().endsWith("foo")) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java new file mode 100644 index 00000000000..7d998a6b33f --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test; + +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction; +import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteConnectionInfo; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.MockNioTransportPlugin; +import org.junit.AfterClass; +import org.junit.Before; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; +import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; + +public abstract class AbstractMultiClustersTestCase extends ESTestCase { + public static final String LOCAL_CLUSTER = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + + private static volatile ClusterGroup clusterGroup; + + protected Collection remoteClusterAlias() { + return randomSubsetOf(Arrays.asList("cluster-a", "cluster-b")); + } + + protected Collection> nodePlugins(String clusterAlias) { + return Collections.emptyList(); + } + + protected final Client client() { + return client(LOCAL_CLUSTER); + } + + protected final Client client(String clusterAlias) { + return cluster(clusterAlias).client(); + } + + protected final InternalTestCluster cluster(String clusterAlias) { + return clusterGroup.getCluster(clusterAlias); + } + + protected final Map clusters() { + return Collections.unmodifiableMap(clusterGroup.clusters); + } + + protected boolean reuseClusters() { + return true; + } + + @Before + public final void startClusters() throws Exception { + if (clusterGroup != null && reuseClusters()) { + return; + } + stopClusters(); + final Map clusters = new HashMap<>(); + final List clusterAliases = new ArrayList<>(remoteClusterAlias()); + clusterAliases.add(LOCAL_CLUSTER); + for (String clusterAlias : clusterAliases) { + final String clusterName = clusterAlias.equals(LOCAL_CLUSTER) ? "main-cluster" : clusterAlias; + final int numberOfNodes = randomIntBetween(1, 3); + final List> mockPlugins = + Arrays.asList(MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class, MockNioTransportPlugin.class); + final Collection> nodePlugins = nodePlugins(clusterAlias); + final Settings nodeSettings = Settings.EMPTY; + final NodeConfigurationSource nodeConfigurationSource = nodeConfigurationSource(nodeSettings, nodePlugins); + final InternalTestCluster cluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodes, + numberOfNodes, clusterName, nodeConfigurationSource, 0, clusterName + "-", mockPlugins, Function.identity()); + cluster.beforeTest(random(), 0); + clusters.put(clusterAlias, cluster); + } + clusterGroup = new ClusterGroup(clusters); + configureRemoteClusters(); + } + + @AfterClass + public static void stopClusters() throws IOException { + IOUtils.close(clusterGroup); + clusterGroup = null; + } + + private void configureRemoteClusters() throws Exception { + Map> seedNodes = new HashMap<>(); + for (String clusterAlias : clusterGroup.clusterAliases()) { + if (clusterAlias.equals(LOCAL_CLUSTER) == false) { + final InternalTestCluster cluster = clusterGroup.getCluster(clusterAlias); + final String[] allNodes = cluster.getNodeNames(); + final List selectedNodes = randomSubsetOf(randomIntBetween(1, Math.min(3, allNodes.length)), allNodes); + seedNodes.put(clusterAlias, selectedNodes); + } + } + if (seedNodes.isEmpty()) { + return; + } + Settings.Builder settings = Settings.builder(); + for (Map.Entry> entry : seedNodes.entrySet()) { + final String clusterAlias = entry.getKey(); + final String seeds = entry.getValue().stream() + .map(node -> cluster(clusterAlias).getInstance(TransportService.class, node).boundAddress().publishAddress().toString()) + .collect(Collectors.joining(",")); + settings.put("cluster.remote." + clusterAlias + ".seeds", seeds); + } + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get(); + assertBusy(() -> { + List remoteConnectionInfos = client() + .execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).actionGet().getInfos() + .stream().filter(RemoteConnectionInfo::isConnected) + .collect(Collectors.toList()); + final long totalConnections = seedNodes.values().stream().map(List::size).count(); + assertThat(remoteConnectionInfos, hasSize(Math.toIntExact(totalConnections))); + }); + } + + static class ClusterGroup implements Closeable { + private final Map clusters; + + ClusterGroup(Map clusters) { + this.clusters = Collections.unmodifiableMap(clusters); + } + + InternalTestCluster getCluster(String clusterAlias) { + assertThat(clusters, hasKey(clusterAlias)); + return clusters.get(clusterAlias); + } + + Set clusterAliases() { + return clusters.keySet(); + } + + @Override + public void close() throws IOException { + IOUtils.close(clusters.values()); + } + } + + static NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection> nodePlugins) { + final Settings.Builder builder = Settings.builder(); + builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes + builder.putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file"); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); + builder.put(nodeSettings); + + return new NodeConfigurationSource() { + @Override + public Settings nodeSettings(int nodeOrdinal) { + return builder.build(); + } + + @Override + public Path nodeConfigPath(int nodeOrdinal) { + return null; + } + + @Override + public Collection> nodePlugins() { + return nodePlugins; + } + }; + } +} diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java new file mode 100644 index 00000000000..426a9ed2f4d --- /dev/null +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/CCSPointInTimeIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.search; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; + +public class CCSPointInTimeIT extends AbstractMultiClustersTestCase { + + @Override + protected Collection remoteClusterAlias() { + return Collections.singletonList("remote_cluster"); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + final List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(LocalStateCompositeXPackPlugin.class); + return plugins; + } + + void indexDocs(Client client, String index, int numDocs) { + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + client.prepareIndex(index, "_doc").setId(id).setSource("value", i).get(); + } + client.admin().indices().prepareRefresh(index).get(); + } + + public void testBasic() { + final Client localClient = client(LOCAL_CLUSTER); + final Client remoteClient = client("remote_cluster"); + int localNumDocs = randomIntBetween(10, 50); + assertAcked(localClient.admin().indices().prepareCreate("local_test")); + indexDocs(localClient, "local_test", localNumDocs); + + int remoteNumDocs = randomIntBetween(10, 50); + assertAcked(remoteClient.admin().indices().prepareCreate("remote_test")); + indexDocs(remoteClient, "remote_test", remoteNumDocs); + boolean includeLocalIndex = randomBoolean(); + List indices = new ArrayList<>(); + if (includeLocalIndex) { + indices.add( randomFrom("*", "local_*", "local_test")); + } + indices.add(randomFrom("*:*", "remote_cluster:*", "remote_cluster:remote_test")); + String pitId = openPointInTime(indices.toArray(new String[0]), TimeValue.timeValueMinutes(2)); + try { + if (randomBoolean()) { + localClient.prepareIndex("local_test", "_doc").setId("local_new").setSource().get(); + localClient.admin().indices().prepareRefresh().get(); + } + if (randomBoolean()) { + remoteClient.prepareIndex("remote_test", "_doc").setId("remote_new").setSource().get(); + remoteClient.admin().indices().prepareRefresh().get(); + } + SearchResponse resp = localClient.prepareSearch() + .setPreference(null) + .setQuery(new MatchAllQueryBuilder()) + .setSearchContext(pitId, TimeValue.timeValueMinutes(2)) + .setSize(1000) + .get(); + assertNoFailures(resp); + assertHitCount(resp, (includeLocalIndex ? localNumDocs : 0) + remoteNumDocs); + } finally { + closePointInTime(pitId); + } + } + + private String openPointInTime(String[] indices, TimeValue keepAlive) { + OpenPointInTimeRequest request = new OpenPointInTimeRequest( + indices, + OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, + keepAlive, + null, + null + ); + final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet(); + return response.getSearchContextId(); + } + + private void closePointInTime(String readerId) { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(readerId)).actionGet(); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java index bf040396878..04beae9ad83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Objects; public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject { private static final ParseField ID = new ParseField("id"); @@ -21,7 +22,7 @@ public final class OpenPointInTimeResponse extends ActionResponse implements ToX private final String searchContextId; public OpenPointInTimeResponse(String searchContextId) { - this.searchContextId = searchContextId; + this.searchContextId = Objects.requireNonNull(searchContextId); } public OpenPointInTimeResponse(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java index d6cb1ee7591..5ebb4731ec5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java @@ -72,6 +72,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction(phaseListener, ShardOpenReaderResponse::new) ); }, - ActionListener.map(listener, r -> new OpenPointInTimeResponse(r.pointInTimeId())) + ActionListener.map(listener, r -> { + assert r.pointInTimeId() != null : r; + return new OpenPointInTimeResponse(r.pointInTimeId()); + }) ); } diff --git a/x-pack/qa/multi-cluster-search-security/build.gradle b/x-pack/qa/multi-cluster-search-security/build.gradle index 9e7196a4db6..aa745149953 100644 --- a/x-pack/qa/multi-cluster-search-security/build.gradle +++ b/x-pack/qa/multi-cluster-search-security/build.gradle @@ -10,7 +10,7 @@ dependencies { restResources { restApi { - includeXpack 'security', 'async_search', 'indices' + includeXpack 'security', 'async_search', 'indices', 'open_point_in_time', 'close_point_in_time' } } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml index b8436f64bcd..0bfc91fe861 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml @@ -30,13 +30,15 @@ - match: {indices.5.attributes.0: open} - match: {indices.6.name: my_remote_cluster:field_caps_index_3} - match: {indices.6.attributes.0: open} - - match: {indices.7.name: my_remote_cluster:secured_via_alias} - - match: {indices.7.attributes.0: open} - - match: {indices.8.name: my_remote_cluster:single_doc_index} + - match: {indices.7.name: my_remote_cluster:point_in_time_index } + - match: {indices.7.attributes.0: open } + - match: {indices.8.name: my_remote_cluster:secured_via_alias} - match: {indices.8.attributes.0: open} - - match: {indices.9.name: my_remote_cluster:test_index} - - match: {indices.9.aliases.0: aliased_test_index} - - match: {indices.9.attributes.0: open} + - match: {indices.9.name: my_remote_cluster:single_doc_index} + - match: {indices.10.attributes.0: open} + - match: {indices.10.name: my_remote_cluster:test_index} + - match: {indices.10.aliases.0: aliased_test_index} + - match: {indices.10.attributes.0: open} - match: {aliases.0.name: my_remote_cluster:.security} - match: {aliases.0.indices.0: .security-7} - match: {aliases.1.name: my_remote_cluster:aliased_closed_index} diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml new file mode 100644 index 00000000000..0aac5ce25e5 --- /dev/null +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/80_point_in_time.yml @@ -0,0 +1,113 @@ +--- +setup: + - skip: + features: headers + + - do: + cluster.health: + wait_for_status: yellow + - do: + security.put_user: + username: "joe" + body: > + { + "password": "s3krit", + "roles" : [ "x_cluster_role" ] + } + - do: + security.put_role: + name: "x_cluster_role" + body: > + { + "cluster": [], + "indices": [ + { + "names": ["local_pit", "my_remote_cluster:point_in_time_index"], + "privileges": ["read"] + } + ] + } + + - do: + security.put_user: + username: "remote" + body: > + { + "password": "s3krit", + "roles" : [ "remote_ccs" ] + } + - do: + security.put_role: + name: "remote_ccs" + body: > + { + } +--- +teardown: + - do: + security.delete_user: + username: "joe" + ignore: 404 + - do: + security.delete_role: + name: "x_cluster_role" + ignore: 404 +--- +"Search with point in time": + + - do: + indices.create: + index: local_pit + body: + settings: + index: + number_of_shards: 2 + number_of_replicas: 0 + mappings: + properties: + created_at: + type: date + format: "yyyy-MM-dd" + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "local_pit"}}' + - '{"f": "l1", "created_at" : "2020-01-01"}' + - '{"index": {"_index": "local_pit"}}' + - '{"f": "l2", "created_at" : "2021-01-02"}' + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + open_point_in_time: + index: my_remote_cluster:point_in_time_index,local_pit + keep_alive: 5m + - set: {id: pit_id} + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + search: + rest_total_hits_as_int: true + sort: created_at + body: + query: + range: + created_at: + gte: "2020-01-03" + pit: + id: "$pit_id" + keep_alive: 1m + + - match: { hits.total: 3 } + - match: { hits.hits.0._index: "my_remote_cluster:point_in_time_index" } + - match: { hits.hits.0._source.f: "r3" } + - match: { hits.hits.1._index: "my_remote_cluster:point_in_time_index" } + - match: { hits.hits.1._source.f: "r4" } + - match: { hits.hits.2._index: "local_pit" } + - match: { hits.hits.2._source.f: "l2" } + + - do: + headers: { Authorization: "Basic am9lOnMza3JpdA==" } + close_point_in_time: + body: + id: "$pit_id" diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml index a87eb9bd63b..46ffe740794 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml @@ -23,7 +23,7 @@ setup: "indices": [ { "names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1", - "field_caps_index_3"], + "field_caps_index_3", "point_in_time_index"], "privileges": ["read", "read_cross_cluster"] } ] @@ -46,7 +46,7 @@ setup: "indices": [ { "names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1", - "field_caps_index_3"], + "field_caps_index_3", "point_in_time_index"], "privileges": ["read", "read_cross_cluster"] } ] @@ -272,3 +272,30 @@ setup: "roles" : [ ] } - match: { created: false } + + - do: + indices.create: + index: point_in_time_index + body: + settings: + index: + number_of_shards: 2 + number_of_replicas: 0 + mappings: + properties: + created_at: + type: date + format: "yyyy-MM-dd" + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r1", "created_at" : "2020-01-01"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r2", "created_at" : "2020-01-02"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r3", "created_at" : "2020-01-03"}' + - '{"index": {"_index": "point_in_time_index"}}' + - '{"f": "r4", "created_at" : "2020-01-04"}' +