Support point in time cross cluster search (#61827)

This commit integrates point in time into cross cluster search.

Relates #61062
Closes #61790
This commit is contained in:
Nhat Nguyen 2020-09-09 20:55:44 -04:00
parent 808c8689ac
commit aafb2cb812
11 changed files with 571 additions and 75 deletions

View File

@ -41,9 +41,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
@ -310,10 +310,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
getRemoteClusterNodeLookup(searchShardsResponses);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
remoteAliasFilters);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
@ -499,41 +508,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
searchContext, searchAsyncActionProvider);
}
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
static BiFunction<String, String, DiscoveryNode> getRemoteClusterNodeLookup(Map<String, ClusterSearchShardsResponse> searchShardsResp) {
Map<String, Map<String, DiscoveryNode>> clusterToNode = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResp.entrySet()) {
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
HashMap<String, DiscoveryNode> idToDiscoveryNode = new HashMap<>();
clusterToNode.put(clusterAlias, idToDiscoveryNode);
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
idToDiscoveryNode.put(remoteNode.getId(), remoteNode);
}
final Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
final AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = AliasFilter.EMPTY;
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null : "alias filter must not be null for index: " + shardId.getIndex();
}
String[] aliases = aliasFilter.getAliases();
String[] finalIndices = aliases.length == 0 ? new String[] {shardId.getIndexName()} : aliases;
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
final OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null : "original indices are null for clusterAlias: " + clusterAlias;
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId,
Arrays.asList(clusterSearchShardsGroup.getShards()), new OriginalIndices(finalIndices,
originalIndices.indicesOptions()));
remoteShardIterators.add(shardIterator);
for (DiscoveryNode remoteNode : entry.getValue().getNodes()) {
clusterToNode.computeIfAbsent(clusterAlias, k -> new HashMap<>()).put(remoteNode.getId(), remoteNode);
}
}
return (clusterAlias, nodeId) -> {
@ -545,6 +525,71 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
};
}
static Map<String, AliasFilter> getRemoteAliasFilters(Map<String, ClusterSearchShardsResponse> searchShardsResp) {
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResp.entrySet()) {
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
final Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
final AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = AliasFilter.EMPTY;
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null : "alias filter must not be null for index: " + shardId.getIndex();
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
}
}
return aliasFilterMap;
}
static List<SearchShardIterator> getRemoteShardsIterator(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
Map<String, AliasFilter> aliasFilterMap) {
final List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
for (ClusterSearchShardsGroup clusterSearchShardsGroup : entry.getValue().getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
AliasFilter aliasFilter = aliasFilterMap.get(shardId.getIndex().getUUID());
String[] aliases = aliasFilter.getAliases();
String clusterAlias = entry.getKey();
String[] finalIndices = aliases.length == 0 ? new String[]{shardId.getIndexName()} : aliases;
final OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null : "original indices are null for clusterAlias: " + clusterAlias;
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId,
Arrays.asList(clusterSearchShardsGroup.getShards()), new OriginalIndices(finalIndices,
originalIndices.indicesOptions()));
remoteShardIterators.add(shardIterator);
}
}
return remoteShardIterators;
}
static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
SearchContextId searchContextId,
TimeValue searchContextKeepAlive,
Map<String, OriginalIndices> remoteClusterIndices) {
final List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
for (ClusterSearchShardsGroup group : entry.getValue().getGroups()) {
final ShardId shardId = group.getShardId();
final String clusterAlias = entry.getKey();
final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
final List<String> targetNodes = Collections.singletonList(perNode.getNode());
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes,
remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive);
remoteShardIterators.add(shardIterator);
}
}
return remoteShardIterators;
}
private Index[] resolveLocalIndices(OriginalIndices localIndices,
ClusterState clusterState,
SearchTimeProvider timeProvider) {
@ -569,39 +614,34 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final List<SearchShardIterator> localShardIterators;
final Map<String, AliasFilter> aliasFilter;
final Map<String, Set<String>> indexRoutings;
final Executor asyncSearchExecutor;
boolean preFilterSearchShards;
final String[] concreteLocalIndices;
if (searchContext != null) {
assert searchRequest.pointInTimeBuilder() != null;
aliasFilter = searchContext.aliasFilter();
indexRoutings = Collections.emptyMap();
asyncSearchExecutor = asyncSearchExecutor(localIndices.indices(), clusterState);
localShardIterators = getSearchShardsFromSearchContexts(clusterState, localIndices, searchRequest.getLocalClusterAlias(),
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, localIndices.indices(),
localShardIterators.size() + remoteShardIterators.size());
concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
} else {
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
final String[] concreteIndices = new String[indices.length];
concreteLocalIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
concreteLocalIndices[i] = indices[i].getName();
}
asyncSearchExecutor = asyncSearchExecutor(concreteIndices, clusterState);
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
concreteLocalIndices, routingMap, searchRequest.preference(),
searchService.getResponseCollectorService(), nodeSearchCounts);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
.collect(Collectors.toList());
aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
indexRoutings = routingMap;
preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteIndices,
localShardIterators.size() + remoteShardIterators.size());
}
final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
@ -631,6 +671,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final DiscoveryNodes nodes = clusterState.nodes();
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
localShardIterators.size() + remoteShardIterators.size());
searchAsyncActionProvider.asyncSearchAction(
task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener,
@ -874,28 +917,22 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OriginalIndices(e.getValue().toArray(new String[0]), indicesOptions)));
}
static List<SearchShardIterator> getSearchShardsFromSearchContexts(ClusterState clusterState,
static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(ClusterState clusterState,
OriginalIndices originalIndices,
String localClusterAlias,
SearchContextId searchContext,
TimeValue keepAlive) {
final List<SearchShardIterator> iterators = new ArrayList<>(searchContext.shards().size());
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> matchingNodeFirst = new ArrayList<>(shards.size());
final String nodeId = entry.getValue().getNode();
// always search the matching node first even when its shard was relocated to another node
// because the point in time should keep the corresponding search context open.
matchingNodeFirst.add(nodeId);
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(nodeId) == false) {
matchingNodeFirst.add(shard.currentNodeId());
OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = Collections.singletonList(perNode.getNode());
iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices,
perNode.getSearchContextId(), keepAlive));
}
}
iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices,
entry.getValue().getSearchContextId(), keepAlive));
}
return iterators;
}
}

View File

@ -186,12 +186,14 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
checkRestTotalHits(request, searchRequest);
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, namedWriteableRegistry);
preparePointInTime(searchRequest, request, namedWriteableRegistry);
} else {
searchRequest.setCcsMinimizeRoundtrips(
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
}
}
@ -308,7 +310,7 @@ public class RestSearchAction extends BaseRestHandler {
}
}
static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
static void preparePointInTime(SearchRequest request, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry) {
assert request.pointInTimeBuilder() != null;
ActionRequestValidationException validationException = null;
if (request.indices().length > 0) {
@ -323,6 +325,11 @@ public class RestSearchAction extends BaseRestHandler {
if (request.preference() != null) {
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
}
if (restRequest.paramAsBoolean("ccs_minimize_roundtrips", false)) {
validationException =
addValidationError("[ccs_minimize_roundtrips] cannot be used with point in time", validationException);
request.setCcsMinimizeRoundtrips(false);
}
ExceptionsHelper.reThrowIfNotNull(validationException);
final IndicesOptions indicesOptions = request.indicesOptions();

View File

@ -208,7 +208,6 @@ public class TransportSearchActionTests extends ESTestCase {
null)) {
RemoteClusterService service = transportService.getRemoteClusterService();
assertFalse(service.isCrossClusterSearchEnabled());
List<SearchShardIterator> iteratorList = new ArrayList<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
DiscoveryNode[] nodes = new DiscoveryNode[] {
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
@ -246,9 +245,9 @@ public class TransportSearchActionTests extends ESTestCase {
new OriginalIndices(new String[]{"fo*", "ba*"}, SearchRequest.DEFAULT_INDICES_OPTIONS));
remoteIndicesByCluster.put("test_cluster_2",
new OriginalIndices(new String[]{"x*"}, SearchRequest.DEFAULT_INDICES_OPTIONS));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList,
remoteAliases);
Map<String, AliasFilter> remoteAliases = TransportSearchAction.getRemoteAliasFilters(searchShardsResponseMap);
List<SearchShardIterator> iteratorList =
TransportSearchAction.getRemoteShardsIterator(searchShardsResponseMap, remoteIndicesByCluster, remoteAliases);
assertEquals(4, iteratorList.size());
for (SearchShardIterator iterator : iteratorList) {
if (iterator.shardId().getIndexName().endsWith("foo")) {

View File

@ -0,0 +1,199 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
public abstract class AbstractMultiClustersTestCase extends ESTestCase {
public static final String LOCAL_CLUSTER = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
private static volatile ClusterGroup clusterGroup;
protected Collection<String> remoteClusterAlias() {
return randomSubsetOf(Arrays.asList("cluster-a", "cluster-b"));
}
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return Collections.emptyList();
}
protected final Client client() {
return client(LOCAL_CLUSTER);
}
protected final Client client(String clusterAlias) {
return cluster(clusterAlias).client();
}
protected final InternalTestCluster cluster(String clusterAlias) {
return clusterGroup.getCluster(clusterAlias);
}
protected final Map<String, InternalTestCluster> clusters() {
return Collections.unmodifiableMap(clusterGroup.clusters);
}
protected boolean reuseClusters() {
return true;
}
@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
return;
}
stopClusters();
final Map<String, InternalTestCluster> clusters = new HashMap<>();
final List<String> clusterAliases = new ArrayList<>(remoteClusterAlias());
clusterAliases.add(LOCAL_CLUSTER);
for (String clusterAlias : clusterAliases) {
final String clusterName = clusterAlias.equals(LOCAL_CLUSTER) ? "main-cluster" : clusterAlias;
final int numberOfNodes = randomIntBetween(1, 3);
final List<Class<? extends Plugin>> mockPlugins =
Arrays.asList(MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class, MockNioTransportPlugin.class);
final Collection<Class<? extends Plugin>> nodePlugins = nodePlugins(clusterAlias);
final Settings nodeSettings = Settings.EMPTY;
final NodeConfigurationSource nodeConfigurationSource = nodeConfigurationSource(nodeSettings, nodePlugins);
final InternalTestCluster cluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodes,
numberOfNodes, clusterName, nodeConfigurationSource, 0, clusterName + "-", mockPlugins, Function.identity());
cluster.beforeTest(random(), 0);
clusters.put(clusterAlias, cluster);
}
clusterGroup = new ClusterGroup(clusters);
configureRemoteClusters();
}
@AfterClass
public static void stopClusters() throws IOException {
IOUtils.close(clusterGroup);
clusterGroup = null;
}
private void configureRemoteClusters() throws Exception {
Map<String, List<String>> seedNodes = new HashMap<>();
for (String clusterAlias : clusterGroup.clusterAliases()) {
if (clusterAlias.equals(LOCAL_CLUSTER) == false) {
final InternalTestCluster cluster = clusterGroup.getCluster(clusterAlias);
final String[] allNodes = cluster.getNodeNames();
final List<String> selectedNodes = randomSubsetOf(randomIntBetween(1, Math.min(3, allNodes.length)), allNodes);
seedNodes.put(clusterAlias, selectedNodes);
}
}
if (seedNodes.isEmpty()) {
return;
}
Settings.Builder settings = Settings.builder();
for (Map.Entry<String, List<String>> entry : seedNodes.entrySet()) {
final String clusterAlias = entry.getKey();
final String seeds = entry.getValue().stream()
.map(node -> cluster(clusterAlias).getInstance(TransportService.class, node).boundAddress().publishAddress().toString())
.collect(Collectors.joining(","));
settings.put("cluster.remote." + clusterAlias + ".seeds", seeds);
}
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get();
assertBusy(() -> {
List<RemoteConnectionInfo> remoteConnectionInfos = client()
.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).actionGet().getInfos()
.stream().filter(RemoteConnectionInfo::isConnected)
.collect(Collectors.toList());
final long totalConnections = seedNodes.values().stream().map(List::size).count();
assertThat(remoteConnectionInfos, hasSize(Math.toIntExact(totalConnections)));
});
}
static class ClusterGroup implements Closeable {
private final Map<String, InternalTestCluster> clusters;
ClusterGroup(Map<String, InternalTestCluster> clusters) {
this.clusters = Collections.unmodifiableMap(clusters);
}
InternalTestCluster getCluster(String clusterAlias) {
assertThat(clusters, hasKey(clusterAlias));
return clusters.get(clusterAlias);
}
Set<String> clusterAliases() {
return clusters.keySet();
}
@Override
public void close() throws IOException {
IOUtils.close(clusters.values());
}
}
static NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection<Class<? extends Plugin>> nodePlugins) {
final Settings.Builder builder = Settings.builder();
builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
builder.putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file");
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
builder.put(nodeSettings);
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}
@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return nodePlugins;
}
};
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.search;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
@Override
protected Collection<String> remoteClusterAlias() {
return Collections.singletonList("remote_cluster");
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(LocalStateCompositeXPackPlugin.class);
return plugins;
}
void indexDocs(Client client, String index, int numDocs) {
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
client.prepareIndex(index, "_doc").setId(id).setSource("value", i).get();
}
client.admin().indices().prepareRefresh(index).get();
}
public void testBasic() {
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client("remote_cluster");
int localNumDocs = randomIntBetween(10, 50);
assertAcked(localClient.admin().indices().prepareCreate("local_test"));
indexDocs(localClient, "local_test", localNumDocs);
int remoteNumDocs = randomIntBetween(10, 50);
assertAcked(remoteClient.admin().indices().prepareCreate("remote_test"));
indexDocs(remoteClient, "remote_test", remoteNumDocs);
boolean includeLocalIndex = randomBoolean();
List<String> indices = new ArrayList<>();
if (includeLocalIndex) {
indices.add( randomFrom("*", "local_*", "local_test"));
}
indices.add(randomFrom("*:*", "remote_cluster:*", "remote_cluster:remote_test"));
String pitId = openPointInTime(indices.toArray(new String[0]), TimeValue.timeValueMinutes(2));
try {
if (randomBoolean()) {
localClient.prepareIndex("local_test", "_doc").setId("local_new").setSource().get();
localClient.admin().indices().prepareRefresh().get();
}
if (randomBoolean()) {
remoteClient.prepareIndex("remote_test", "_doc").setId("remote_new").setSource().get();
remoteClient.admin().indices().prepareRefresh().get();
}
SearchResponse resp = localClient.prepareSearch()
.setPreference(null)
.setQuery(new MatchAllQueryBuilder())
.setSearchContext(pitId, TimeValue.timeValueMinutes(2))
.setSize(1000)
.get();
assertNoFailures(resp);
assertHitCount(resp, (includeLocalIndex ? localNumDocs : 0) + remoteNumDocs);
} finally {
closePointInTime(pitId);
}
}
private String openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(
indices,
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
keepAlive,
null,
null
);
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
return response.getSearchContextId();
}
private void closePointInTime(String readerId) {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(readerId)).actionGet();
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
private static final ParseField ID = new ParseField("id");
@ -21,7 +22,7 @@ public final class OpenPointInTimeResponse extends ActionResponse implements ToX
private final String searchContextId;
public OpenPointInTimeResponse(String searchContextId) {
this.searchContextId = searchContextId;
this.searchContextId = Objects.requireNonNull(searchContextId);
}
public OpenPointInTimeResponse(StreamInput in) throws IOException {

View File

@ -72,6 +72,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
.preference(request.preference())
.routing(request.routing())
.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(false);
transportSearchAction.executeRequest(
task,
searchRequest,
@ -91,7 +92,10 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
new ActionListenerResponseHandler<SearchPhaseResult>(phaseListener, ShardOpenReaderResponse::new)
);
},
ActionListener.map(listener, r -> new OpenPointInTimeResponse(r.pointInTimeId()))
ActionListener.map(listener, r -> {
assert r.pointInTimeId() != null : r;
return new OpenPointInTimeResponse(r.pointInTimeId());
})
);
}

View File

@ -10,7 +10,7 @@ dependencies {
restResources {
restApi {
includeXpack 'security', 'async_search', 'indices'
includeXpack 'security', 'async_search', 'indices', 'open_point_in_time', 'close_point_in_time'
}
}

View File

@ -30,13 +30,15 @@
- match: {indices.5.attributes.0: open}
- match: {indices.6.name: my_remote_cluster:field_caps_index_3}
- match: {indices.6.attributes.0: open}
- match: {indices.7.name: my_remote_cluster:secured_via_alias}
- match: {indices.7.attributes.0: open}
- match: {indices.8.name: my_remote_cluster:single_doc_index}
- match: {indices.7.name: my_remote_cluster:point_in_time_index }
- match: {indices.7.attributes.0: open }
- match: {indices.8.name: my_remote_cluster:secured_via_alias}
- match: {indices.8.attributes.0: open}
- match: {indices.9.name: my_remote_cluster:test_index}
- match: {indices.9.aliases.0: aliased_test_index}
- match: {indices.9.attributes.0: open}
- match: {indices.9.name: my_remote_cluster:single_doc_index}
- match: {indices.10.attributes.0: open}
- match: {indices.10.name: my_remote_cluster:test_index}
- match: {indices.10.aliases.0: aliased_test_index}
- match: {indices.10.attributes.0: open}
- match: {aliases.0.name: my_remote_cluster:.security}
- match: {aliases.0.indices.0: .security-7}
- match: {aliases.1.name: my_remote_cluster:aliased_closed_index}

View File

@ -0,0 +1,113 @@
---
setup:
- skip:
features: headers
- do:
cluster.health:
wait_for_status: yellow
- do:
security.put_user:
username: "joe"
body: >
{
"password": "s3krit",
"roles" : [ "x_cluster_role" ]
}
- do:
security.put_role:
name: "x_cluster_role"
body: >
{
"cluster": [],
"indices": [
{
"names": ["local_pit", "my_remote_cluster:point_in_time_index"],
"privileges": ["read"]
}
]
}
- do:
security.put_user:
username: "remote"
body: >
{
"password": "s3krit",
"roles" : [ "remote_ccs" ]
}
- do:
security.put_role:
name: "remote_ccs"
body: >
{
}
---
teardown:
- do:
security.delete_user:
username: "joe"
ignore: 404
- do:
security.delete_role:
name: "x_cluster_role"
ignore: 404
---
"Search with point in time":
- do:
indices.create:
index: local_pit
body:
settings:
index:
number_of_shards: 2
number_of_replicas: 0
mappings:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "local_pit"}}'
- '{"f": "l1", "created_at" : "2020-01-01"}'
- '{"index": {"_index": "local_pit"}}'
- '{"f": "l2", "created_at" : "2021-01-02"}'
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
open_point_in_time:
index: my_remote_cluster:point_in_time_index,local_pit
keep_alive: 5m
- set: {id: pit_id}
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
rest_total_hits_as_int: true
sort: created_at
body:
query:
range:
created_at:
gte: "2020-01-03"
pit:
id: "$pit_id"
keep_alive: 1m
- match: { hits.total: 3 }
- match: { hits.hits.0._index: "my_remote_cluster:point_in_time_index" }
- match: { hits.hits.0._source.f: "r3" }
- match: { hits.hits.1._index: "my_remote_cluster:point_in_time_index" }
- match: { hits.hits.1._source.f: "r4" }
- match: { hits.hits.2._index: "local_pit" }
- match: { hits.hits.2._source.f: "l2" }
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
close_point_in_time:
body:
id: "$pit_id"

View File

@ -23,7 +23,7 @@ setup:
"indices": [
{
"names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1",
"field_caps_index_3"],
"field_caps_index_3", "point_in_time_index"],
"privileges": ["read", "read_cross_cluster"]
}
]
@ -46,7 +46,7 @@ setup:
"indices": [
{
"names": ["single_doc_index", "secure_alias", "test_index", "aliased_test_index", "field_caps_index_1",
"field_caps_index_3"],
"field_caps_index_3", "point_in_time_index"],
"privileges": ["read", "read_cross_cluster"]
}
]
@ -272,3 +272,30 @@ setup:
"roles" : [ ]
}
- match: { created: false }
- do:
indices.create:
index: point_in_time_index
body:
settings:
index:
number_of_shards: 2
number_of_replicas: 0
mappings:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "point_in_time_index"}}'
- '{"f": "r1", "created_at" : "2020-01-01"}'
- '{"index": {"_index": "point_in_time_index"}}'
- '{"f": "r2", "created_at" : "2020-01-02"}'
- '{"index": {"_index": "point_in_time_index"}}'
- '{"f": "r3", "created_at" : "2020-01-03"}'
- '{"index": {"_index": "point_in_time_index"}}'
- '{"f": "r4", "created_at" : "2020-01-04"}'