Ensure that local cluster alias is never treated as remote (#37121)

With #36997 we added support for providing a local cluster alias with a
`SearchRequest`. We intended to make sure that when provided as part of
a search request, the cluster alias would never be used for connection
lookups. Yet due to a bug we would still end up looking up the
connection from the remote ones.

This commit adds a test to make sure that whenever we set the cluster
alias to the `SearchRequest` (which can only be done at transport), such
alias is used as index prefix in the returned hits. No errors are thrown
despite no remote clusters are configured indicating that such alias is
never used for connection look-ups.

Also, we add explicit support for the empty cluster alias when printing
 out index names through `RemoteClusterAware#buildRemoteIndexName`.
In fact we don't want to print out `:index` when the cluster alias is
set to empty string, but rather `index`. Yet, the semantic of empty
string is different compared to `null` as it will still disable final
reduction. This will be used in CCS when searching against remote
clusters as well as the local one, the local one will have empty prefix
yet it will need to disable final reduction so that its results will be
properly merged with the ones coming from the remote clusters.
This commit is contained in:
Luca Cavanna 2019-01-04 12:19:31 +01:00 committed by GitHub
parent 0fd27d4d6f
commit 21d52f0dab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -401,11 +402,11 @@ public class SearchTransportService {
/**
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
* against the local cluster.
* @param clusterAlias the cluster alias the node should be resolve against
* @param clusterAlias the cluster alias the node should be resolved against
* @param node the node to resolve
* @return a connection to the given node belonging to the cluster with the provided alias.
*/
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode node) {
if (clusterAlias == null) {
return transportService.getConnection(node);
} else {

View File

@ -353,16 +353,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
return (clusterAlias, nodeId) -> {
final DiscoveryNode discoveryNode;
final boolean remoteCluster;
if (clusterAlias == null || requestClusterAlias != null) {
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
discoveryNode = localNodes.apply(nodeId);
remoteCluster = false;
} else {
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
remoteCluster = true;
}
if (discoveryNode == null) {
throw new IllegalStateException("no node found for id: " + nodeId);
}
return nodeToConnection.apply(clusterAlias, discoveryNode);
return nodeToConnection.apply(remoteCluster ? clusterAlias : null, discoveryNode);
};
}

View File

@ -346,7 +346,7 @@ public abstract class RemoteClusterAware {
}
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
? indexName : clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESSingleNodeTestCase;
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
public void testLocalClusterAlias() {
IndexRequest indexRequest = new IndexRequest("test");
indexRequest.id("1");
indexRequest.source("field", "value");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
{
SearchRequest searchRequest = new SearchRequest("local");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
assertEquals(1, hits.length);
SearchHit hit = hits[0];
assertEquals("local", hit.getClusterAlias());
assertEquals("test", hit.getIndex());
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
assertEquals(1, hits.length);
SearchHit hit = hits[0];
assertEquals("", hit.getClusterAlias());
assertEquals("test", hit.getIndex());
assertEquals("1", hit.getId());
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.test.ESTestCase;
public class RemoteClusterAwareTests extends ESTestCase {
public void testBuildRemoteIndexName() {
{
String clusterAlias = randomAlphaOfLengthBetween(5, 10);
String index = randomAlphaOfLengthBetween(5, 10);
String remoteIndexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, index);
assertEquals(clusterAlias + ":" + index, remoteIndexName);
}
{
String index = randomAlphaOfLengthBetween(5, 10);
String clusterAlias = randomBoolean() ? null : RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
String remoteIndexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, index);
assertEquals(index, remoteIndexName);
}
}
}