Fail request if there is a local index that matches the both a remote and local index

This commit is contained in:
Simon Willnauer 2017-01-13 12:22:52 +01:00
parent 6779ea9c2a
commit a8bd57b93c
3 changed files with 23 additions and 8 deletions

View File

@ -88,7 +88,7 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
/** /**
* The name of a node attribute to select nodes that should be connected to in the remote cluster. * The name of a node attribute to select nodes that should be connected to in the remote cluster.
* For instance a node can be configured with <tt>node.node_attr.gateway: true</tt> in order to be eligible as a gateway node between * For instance a node can be configured with <tt>node.attr.gateway: true</tt> in order to be eligible as a gateway node between
* clusters. In that case <tt>search.remote.node_attribute: gateway</tt> can be used to filter out other nodes in the remote cluster. * clusters. In that case <tt>search.remote.node_attribute: gateway</tt> can be used to filter out other nodes in the remote cluster.
* The value of the setting is expected to be a boolean, <tt>true</tt> for nodes that can become gateways, <tt>false</tt> otherwise. * The value of the setting is expected to be a boolean, <tt>true</tt> for nodes that can become gateways, <tt>false</tt> otherwise.
*/ */
@ -177,15 +177,24 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
* *
* @param perClusterIndices a map to fill with remote cluster indices from the given request indices * @param perClusterIndices a map to fill with remote cluster indices from the given request indices
* @param requestIndices the indices in the search request to filter * @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists
*
* @return all indices in the requestIndices array that are not remote cluster indices * @return all indices in the requestIndices array that are not remote cluster indices
*/ */
public String[] filterIndices(Map<String, List<String>> perClusterIndices, String[] requestIndices) { public String[] filterIndices(Map<String, List<String>> perClusterIndices, String[] requestIndices, Predicate<String> indexExists) {
List<String> localIndicesList = new ArrayList<>(); List<String> localIndicesList = new ArrayList<>();
for (String index : requestIndices) { for (String index : requestIndices) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR); int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) { if (i >= 0) {
String remoteCluster = index.substring(0, i); String remoteCluster = index.substring(0, i);
if (isRemoteClusterRegistered(remoteCluster)) { if (isRemoteClusterRegistered(remoteCluster)) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Index " + index + " exists but there is also a remote cluster named: "
+ remoteCluster + " can't filter indices");
}
String remoteIndex = index.substring(i + 1); String remoteIndex = index.substring(i + 1);
List<String> indices = perClusterIndices.get(remoteCluster); List<String> indices = perClusterIndices.get(remoteCluster);
if (indices == null) { if (indices == null) {

View File

@ -122,9 +122,11 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final long startTimeInMillis = Math.max(0, System.currentTimeMillis()); final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
final String[] localIndices; final String[] localIndices;
final Map<String, List<String>> remoteIndicesByCluster; final Map<String, List<String>> remoteIndicesByCluster;
final ClusterState clusterState = clusterService.state();
if (remoteClusterService.isCrossClusterSearchEnabled()) { if (remoteClusterService.isCrossClusterSearchEnabled()) {
remoteIndicesByCluster = new HashMap<>(); remoteIndicesByCluster = new HashMap<>();
localIndices = remoteClusterService.filterIndices(remoteIndicesByCluster, searchRequest.indices()); localIndices = remoteClusterService.filterIndices(remoteIndicesByCluster, searchRequest.indices(),
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
} else { } else {
remoteIndicesByCluster = Collections.emptyMap(); remoteIndicesByCluster = Collections.emptyMap();
localIndices = searchRequest.indices(); localIndices = searchRequest.indices();
@ -132,7 +134,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (remoteIndicesByCluster.isEmpty()) { if (remoteIndicesByCluster.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(), executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
(nodeId) -> null, Collections.emptyMap(), listener); (nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else { } else {
remoteClusterService.collectSearchShards(searchRequest, remoteIndicesByCluster, remoteClusterService.collectSearchShards(searchRequest, remoteIndicesByCluster,
ActionListener.wrap((searchShardsResponses) -> { ActionListener.wrap((searchShardsResponses) -> {
@ -141,16 +143,16 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards( Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
searchShardsResponses, remoteShardIterators, remoteAliasFilters); searchShardsResponses, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators, executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators,
connectionFunction, remoteAliasFilters, listener); connectionFunction, clusterState, remoteAliasFilters, listener);
}, listener::onFailure)); }, listener::onFailure));
} }
} }
private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices, private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections, List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener) { ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead

View File

@ -144,11 +144,15 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertFalse(service.isRemoteClusterRegistered("foo")); assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = new HashMap<>(); Map<String, List<String>> perClusterIndices = new HashMap<>();
String[] localIndices = service.filterIndices(perClusterIndices, new String[]{"foo:bar", "cluster_1:bar", String[] localIndices = service.filterIndices(perClusterIndices, new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}); "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> false);
assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices); assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices);
assertEquals(2, perClusterIndices.size()); assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1")); assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2")); assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2"));
expectThrows(IllegalArgumentException.class, () ->
service.filterIndices(perClusterIndices, new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i)));
} }
} }
} }