Improve remote / local indices filtering by not modifying external state

This commit is contained in:
Simon Willnauer 2017-01-17 14:05:36 +01:00
parent 709cb9a39e
commit 88f6ae55f5
3 changed files with 36 additions and 41 deletions

View File

@ -67,6 +67,8 @@ import java.util.stream.Stream;
*/
public final class RemoteClusterService extends AbstractComponent implements Closeable {
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
@ -113,6 +115,9 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must have the empty string as it's key");
}
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
if (seeds.isEmpty()) {
connectionListener.onResponse(null);
@ -173,43 +178,38 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
}
/**
* Filters out indices that refer to a remote cluster and adds them to the given per cluster indices map.
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param perClusterIndices a map to fill with remote cluster indices from the given request indices
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists
*
* @return all indices in the requestIndices array that are not remote cluster indices
* @return a map of grouped remote and local indices
*/
String[] filterIndices(Map<String, List<String>> perClusterIndices, String[] requestIndices, Predicate<String> indexExists) {
List<String> localIndicesList = new ArrayList<>();
Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
for (String index : requestIndices) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
String indexName = index;
String clusterName = LOCAL_CLUSTER_GROUP_KEY;
if (i >= 0) {
String remoteCluster = index.substring(0, i);
if (isRemoteClusterRegistered(remoteCluster)) {
String remoteClusterName = index.substring(0, i);
if (isRemoteClusterRegistered(remoteClusterName)) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Index " + index + " exists but there is also a remote cluster named: "
+ remoteCluster + " can't filter indices");
+ clusterName + " can't filter indices");
}
String remoteIndex = index.substring(i + 1);
List<String> indices = perClusterIndices.get(remoteCluster);
if (indices == null) {
indices = new ArrayList<>();
perClusterIndices.put(remoteCluster, indices);
}
indices.add(remoteIndex);
} else {
localIndicesList.add(index);
indexName = index.substring(i + 1);
clusterName = remoteClusterName;
}
} else {
localIndicesList.add(index);
}
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<String>()).add(indexName);
}
return localIndicesList.toArray(new String[localIndicesList.size()]);
return perClusterIndices;
}
/**

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -121,22 +122,23 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
final String[] localIndices;
final Map<String, List<String>> remoteIndicesByCluster;
final Map<String, List<String>> remoteClusterIndices;
final ClusterState clusterState = clusterService.state();
if (remoteClusterService.isCrossClusterSearchEnabled()) {
remoteIndicesByCluster = new HashMap<>();
localIndices = remoteClusterService.filterIndices(remoteIndicesByCluster, searchRequest.indices(),
remoteClusterIndices = remoteClusterService.groupClusterIndices( searchRequest.indices(), // empty string is not allowed
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
List<String> remove = remoteClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
localIndices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
} else {
remoteIndicesByCluster = Collections.emptyMap();
remoteClusterIndices = Collections.emptyMap();
localIndices = searchRequest.indices();
}
if (remoteIndicesByCluster.isEmpty()) {
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
(nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteIndicesByCluster,
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
ActionListener.wrap((searchShardsResponses) -> {
List<ShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();

View File

@ -19,20 +19,11 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
@ -47,7 +38,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.net.InetAddress;
@ -60,7 +50,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class RemoteClusterServiceTests extends ESTestCase {
@ -118,7 +107,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
public void testFilterIndices() throws IOException {
public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
@ -142,16 +131,18 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = new HashMap<>();
String[] localIndices = service.filterIndices(perClusterIndices, new String[]{"foo:bar", "cluster_1:bar",
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2"));
expectThrows(IllegalArgumentException.class, () ->
service.filterIndices(perClusterIndices, new String[]{"foo:bar", "cluster_1:bar",
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i)));
}
}
@ -188,6 +179,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
service.updateRemoteCluster("cluster_2", Collections.emptyList());
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
expectThrows(IllegalArgumentException.class,
() -> service.updateRemoteCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
}
}
}