Ensure remote cluster alias is preserved in inner hits aggs (#25627)
We lost the cluster alias due to some special caseing in inner hits and due to the fact that we didn't pass on the alias to the shard request. This change ensures that we have the cluster alias present on the shard to ensure all SearchShardTarget reads preserve the alias. Relates to #25606
This commit is contained in:
parent
977712f977
commit
ec1afe30ea
|
@ -31,6 +31,7 @@ import org.elasticsearch.action.support.TransportActions;
|
|||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
|
@ -297,11 +298,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
}
|
||||
|
||||
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
|
||||
String clusterAlias = shardIt.getClusterAlias();
|
||||
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
|
||||
assert filter != null;
|
||||
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
|
||||
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
|
||||
filter, indexBoost, timeProvider.getAbsoluteStartMillis());
|
||||
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -216,9 +216,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
//add the cluster name to the remote index names for indices disambiguation
|
||||
//this ends up in the hits returned with the search response
|
||||
ShardId shardId = clusterSearchShardsGroup.getShardId();
|
||||
Index remoteIndex = shardId.getIndex();
|
||||
Index index = new Index(RemoteClusterAware.buildRemoteIndexName(clusterAlias, remoteIndex.getName()),
|
||||
remoteIndex.getUUID());
|
||||
final AliasFilter aliasFilter;
|
||||
if (indicesAndFilters == null) {
|
||||
aliasFilter = AliasFilter.EMPTY;
|
||||
|
@ -229,10 +226,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
String[] aliases = aliasFilter.getAliases();
|
||||
String[] finalIndices = aliases.length == 0 ? new String[] {shardId.getIndexName()} : aliases;
|
||||
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
|
||||
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
|
||||
aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
|
||||
final OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
|
||||
assert originalIndices != null : "original indices are null for clusterAlias: " + clusterAlias;
|
||||
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, new ShardId(index, shardId.getId()),
|
||||
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId,
|
||||
Arrays.asList(clusterSearchShardsGroup.getShards()), new OriginalIndices(finalIndices,
|
||||
originalIndices.indicesOptions()));
|
||||
remoteShardIterators.add(shardIterator);
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
|
||||
import org.elasticsearch.search.lookup.SourceLookup;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -106,6 +107,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<D
|
|||
private SearchShardTarget shard;
|
||||
|
||||
private transient String index;
|
||||
private transient String clusterAlias;
|
||||
|
||||
private Map<String, Object> sourceAsMap;
|
||||
|
||||
|
@ -329,9 +331,17 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<D
|
|||
this.shard = target;
|
||||
if (target != null) {
|
||||
this.index = target.getIndex();
|
||||
this.clusterAlias = target.getClusterAlias();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cluster alias this hit comes from or null if it comes from a local cluster
|
||||
*/
|
||||
public String getClusterAlias() {
|
||||
return clusterAlias;
|
||||
}
|
||||
|
||||
public void matchedQueries(String[] matchedQueries) {
|
||||
this.matchedQueries = matchedQueries;
|
||||
}
|
||||
|
@ -408,7 +418,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<D
|
|||
nestedIdentity.toXContent(builder, params);
|
||||
} else {
|
||||
if (index != null) {
|
||||
builder.field(Fields._INDEX, index);
|
||||
builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index));
|
||||
}
|
||||
if (type != null) {
|
||||
builder.field(Fields._TYPE, type);
|
||||
|
|
|
@ -61,12 +61,6 @@ public final class SearchHits implements Streamable, ToXContent, Iterable<Search
|
|||
this.maxScore = maxScore;
|
||||
}
|
||||
|
||||
public void shardTarget(SearchShardTarget shardTarget) {
|
||||
for (SearchHit hit : hits) {
|
||||
hit.shard(shardTarget);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The total number of hits that matches the search request.
|
||||
*/
|
||||
|
|
|
@ -512,7 +512,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
||||
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
|
||||
indexShard.shardId(), null, OriginalIndices.NONE);
|
||||
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
|
||||
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
|
||||
|
||||
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -27,6 +28,7 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -40,7 +42,7 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
//original indices and cluster alias are only needed in the coordinating node throughout the search request execution.
|
||||
//no need to serialize them as part of SearchShardTarget.
|
||||
private final transient OriginalIndices originalIndices;
|
||||
private final transient String clusterAlias;
|
||||
private final String clusterAlias;
|
||||
|
||||
public SearchShardTarget(StreamInput in) throws IOException {
|
||||
if (in.readBoolean()) {
|
||||
|
@ -50,7 +52,11 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
}
|
||||
shardId = ShardId.readShardId(in);
|
||||
this.originalIndices = null;
|
||||
this.clusterAlias = null;
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
clusterAlias = in.readOptionalString();
|
||||
} else {
|
||||
clusterAlias = null;
|
||||
}
|
||||
}
|
||||
|
||||
public SearchShardTarget(String nodeId, ShardId shardId, String clusterAlias, OriginalIndices originalIndices) {
|
||||
|
@ -61,8 +67,8 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
}
|
||||
|
||||
//this constructor is only used in tests
|
||||
public SearchShardTarget(String nodeId, Index index, int shardId) {
|
||||
this(nodeId, new ShardId(index, shardId), null, OriginalIndices.NONE);
|
||||
public SearchShardTarget(String nodeId, Index index, int shardId, String clusterAlias) {
|
||||
this(nodeId, new ShardId(index, shardId), clusterAlias, OriginalIndices.NONE);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -108,6 +114,9 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
out.writeText(nodeId);
|
||||
}
|
||||
shardId.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
out.writeOptionalString(clusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,7 +126,7 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
SearchShardTarget that = (SearchShardTarget) o;
|
||||
if (shardId.equals(that.shardId) == false) return false;
|
||||
if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false;
|
||||
|
||||
if (clusterAlias != null ? !clusterAlias.equals(that.clusterAlias) : that.clusterAlias != null) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -126,14 +135,17 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
|||
int result = nodeId != null ? nodeId.hashCode() : 0;
|
||||
result = 31 * result + (shardId.getIndexName() != null ? shardId.getIndexName().hashCode() : 0);
|
||||
result = 31 * result + shardId.hashCode();
|
||||
result = 31 * result + (clusterAlias != null ? clusterAlias.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String shardToString = "[" + RemoteClusterAware.buildRemoteIndexName(clusterAlias, shardId.getIndexName()) + "][" + shardId.getId()
|
||||
+ "]";
|
||||
if (nodeId == null) {
|
||||
return "[_na_]" + shardId;
|
||||
return "[_na_]" + shardToString;
|
||||
}
|
||||
return "[" + nodeId + "]" + shardId;
|
||||
return "[" + nodeId + "]" + shardToString;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import java.util.Optional;
|
|||
|
||||
public class ShardSearchLocalRequest implements ShardSearchRequest {
|
||||
|
||||
private String clusterAlias;
|
||||
private ShardId shardId;
|
||||
private int numberOfShards;
|
||||
private SearchType searchType;
|
||||
|
@ -76,11 +77,12 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
}
|
||||
|
||||
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis) {
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
|
||||
this(shardId, numberOfShards, searchRequest.searchType(),
|
||||
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost);
|
||||
this.scroll = searchRequest.scroll();
|
||||
this.nowInMillis = nowInMillis;
|
||||
this.clusterAlias = clusterAlias;
|
||||
}
|
||||
|
||||
public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis, AliasFilter aliasFilter) {
|
||||
|
@ -197,6 +199,9 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
}
|
||||
nowInMillis = in.readVLong();
|
||||
requestCache = in.readOptionalBoolean();
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
clusterAlias = in.readOptionalString();
|
||||
}
|
||||
}
|
||||
|
||||
protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException {
|
||||
|
@ -216,6 +221,9 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
out.writeVLong(nowInMillis);
|
||||
}
|
||||
out.writeOptionalBoolean(requestCache);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
out.writeOptionalString(clusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -238,4 +246,9 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
}
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterAlias() {
|
||||
return clusterAlias;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,4 +141,10 @@ public interface ShardSearchRequest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cluster alias if this request is for a remote cluster or <code>null</code> if the request if targeted to the local
|
||||
* cluster.
|
||||
*/
|
||||
String getClusterAlias();
|
||||
|
||||
}
|
||||
|
|
|
@ -54,9 +54,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
}
|
||||
|
||||
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis) {
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
|
||||
nowInMillis);
|
||||
nowInMillis, clusterAlias);
|
||||
this.originalIndices = originalIndices;
|
||||
}
|
||||
|
||||
|
@ -141,6 +141,7 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
shardSearchLocalRequest = new ShardSearchLocalRequest();
|
||||
shardSearchLocalRequest.innerReadFrom(in);
|
||||
originalIndices = OriginalIndices.readOriginalIndices(in);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -180,4 +181,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
// Shard id is enough here, the request itself can be found by looking at the parent task description
|
||||
return "shardId[" + shardSearchLocalRequest.shardId() + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterAlias() {
|
||||
return shardSearchLocalRequest.getClusterAlias();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,6 +161,6 @@ public abstract class RemoteClusterAware extends AbstractComponent {
|
|||
}
|
||||
|
||||
public static final String buildRemoteIndexName(String clusterAlias, String indexName) {
|
||||
return clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
|
||||
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,9 +114,9 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
assertEquals(ElasticsearchException.getExceptionName(rootCauses[0]), "index_not_found_exception");
|
||||
assertEquals(rootCauses[0].getMessage(), "no such index");
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2, null));
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed",
|
||||
new ShardSearchFailure[]{failure, failure1});
|
||||
if (randomBoolean()) {
|
||||
|
@ -135,11 +135,11 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
{
|
||||
ShardSearchFailure failure = new ShardSearchFailure(
|
||||
new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new QueryShardException(new Index("foo1", "_na_"), "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 1, null));
|
||||
ShardSearchFailure failure2 = new ShardSearchFailure(new QueryShardException(new Index("foo1", "_na_"), "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 2, null));
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed",
|
||||
new ShardSearchFailure[]{failure, failure1, failure2});
|
||||
final ElasticsearchException[] rootCauses = ex.guessRootCauses();
|
||||
|
@ -166,9 +166,9 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
public void testDeduplicate() throws IOException {
|
||||
{
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2, null));
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed",
|
||||
randomBoolean() ? failure1.getCause() : failure.getCause(), new ShardSearchFailure[]{failure, failure1});
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
|
@ -182,11 +182,11 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
}
|
||||
{
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new QueryShardException(new Index("foo1", "_na_"), "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 1, null));
|
||||
ShardSearchFailure failure2 = new ShardSearchFailure(new QueryShardException(new Index("foo1", "_na_"), "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo1", "_na_"), 2, null));
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed",
|
||||
new ShardSearchFailure[]{failure, failure1, failure2});
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
|
@ -202,9 +202,9 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
}
|
||||
{
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2, null));
|
||||
NullPointerException nullPointerException = new NullPointerException();
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed", nullPointerException,
|
||||
new ShardSearchFailure[]{failure, failure1});
|
||||
|
@ -891,7 +891,7 @@ public class ElasticsearchExceptionTests extends ESTestCase {
|
|||
actual = new SearchPhaseExecutionException("search", "all shards failed",
|
||||
new ShardSearchFailure[]{
|
||||
new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1))
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null))
|
||||
});
|
||||
expected = new ElasticsearchException("Elasticsearch exception [type=search_phase_execution_exception, " +
|
||||
"reason=all shards failed]");
|
||||
|
|
|
@ -65,7 +65,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.indices.IndexTemplateMissingException;
|
||||
import org.elasticsearch.indices.InvalidIndexTemplateException;
|
||||
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
|
||||
|
@ -284,7 +283,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSearchException() throws IOException {
|
||||
SearchShardTarget target = new SearchShardTarget("foo", new Index("bar", "_na_"), 1);
|
||||
SearchShardTarget target = new SearchShardTarget("foo", new Index("bar", "_na_"), 1, null);
|
||||
SearchException ex = serialize(new SearchException(target, "hello world"));
|
||||
assertEquals(target, ex.shard());
|
||||
assertEquals(ex.getMessage(), "hello world");
|
||||
|
|
|
@ -61,13 +61,13 @@ public class CountedCollectorTests extends ESTestCase {
|
|||
DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null);
|
||||
dfsSearchResult.setShardIndex(shardID);
|
||||
dfsSearchResult.setSearchShardTarget(new SearchShardTarget("foo",
|
||||
new Index("bar", "baz"), shardID));
|
||||
new Index("bar", "baz"), shardID, null));
|
||||
collector.onResult(dfsSearchResult);});
|
||||
break;
|
||||
case 2:
|
||||
state.add(2);
|
||||
executor.execute(() -> collector.onFailure(shardID, new SearchShardTarget("foo", new Index("bar", "baz"),
|
||||
shardID), new RuntimeException("boom")));
|
||||
shardID, null), new RuntimeException("boom")));
|
||||
break;
|
||||
default:
|
||||
fail("unknown state");
|
||||
|
|
|
@ -51,8 +51,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void testDfsWith2Shards() throws IOException {
|
||||
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
|
||||
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0, null)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0, null)));
|
||||
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
|
||||
|
@ -64,12 +64,14 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
SearchActionListener<QuerySearchResult> listener) {
|
||||
if (request.id() == 1) {
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0,
|
||||
null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(2); // the size of the result set
|
||||
listener.onResponse(queryResult);
|
||||
} else if (request.id() == 2) {
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node2", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node2", new Index("test", "na"), 0,
|
||||
null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(2); // the size of the result set
|
||||
listener.onResponse(queryResult);
|
||||
|
@ -106,8 +108,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void testDfsWith1ShardFailed() throws IOException {
|
||||
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
|
||||
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0, null)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0, null)));
|
||||
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
|
||||
|
@ -119,7 +121,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
SearchActionListener<QuerySearchResult> listener) {
|
||||
if (request.id() == 1) {
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0,
|
||||
null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(2); // the size of the result set
|
||||
listener.onResponse(queryResult);
|
||||
|
@ -161,8 +164,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void testFailPhaseOnException() throws IOException {
|
||||
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
|
||||
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
|
||||
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0, null)));
|
||||
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0, null)));
|
||||
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
|
||||
|
||||
|
@ -174,7 +177,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
|
|||
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
|
||||
SearchActionListener<QuerySearchResult> listener) {
|
||||
if (request.id() == 1) {
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0,
|
||||
null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(2); // the size of the result set
|
||||
listener.onResponse(queryResult);
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.search.SearchShardTarget;
|
|||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -91,13 +90,13 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
|
||||
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
|
||||
int resultSetSize = randomIntBetween(2, 10);
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize); // the size of the result set
|
||||
queryResult.setShardIndex(0);
|
||||
results.consumeResult(queryResult);
|
||||
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize);
|
||||
queryResult.setShardIndex(1);
|
||||
|
@ -145,13 +144,13 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
|
||||
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
|
||||
int resultSetSize = randomIntBetween(2, 10);
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize); // the size of the result set
|
||||
queryResult.setShardIndex(0);
|
||||
results.consumeResult(queryResult);
|
||||
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize);
|
||||
queryResult.setShardIndex(1);
|
||||
|
@ -204,7 +203,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
|
||||
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
|
||||
for (int i = 0; i < numHits; i++) {
|
||||
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize); // the size of the result set
|
||||
queryResult.setShardIndex(i);
|
||||
|
@ -259,13 +258,13 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
|
||||
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
|
||||
int resultSetSize = randomIntBetween(2, 10);
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize); // the size of the result set
|
||||
queryResult.setShardIndex(0);
|
||||
results.consumeResult(queryResult);
|
||||
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize);
|
||||
queryResult.setShardIndex(1);
|
||||
|
@ -312,13 +311,13 @@ public class FetchSearchPhaseTests extends ESTestCase {
|
|||
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
|
||||
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
|
||||
int resultSetSize = 1;
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
|
||||
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize); // the size of the result set
|
||||
queryResult.setShardIndex(0);
|
||||
results.consumeResult(queryResult);
|
||||
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
|
||||
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1, null));
|
||||
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
|
||||
queryResult.size(resultSetSize);
|
||||
queryResult.setShardIndex(1);
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.action.search;
|
|||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
|
@ -43,7 +42,6 @@ import org.elasticsearch.search.query.QuerySearchResult;
|
|||
import org.elasticsearch.search.suggest.Suggest;
|
||||
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.TestCluster;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -54,8 +52,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -188,7 +184,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
AtomicArray<SearchPhaseResult> queryResults = new AtomicArray<>(nShards);
|
||||
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
||||
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
|
||||
new SearchShardTarget("", new Index("", ""), shardIndex));
|
||||
new SearchShardTarget("", new Index("", ""), shardIndex, null));
|
||||
TopDocs topDocs = new TopDocs(0, new ScoreDoc[0], 0);
|
||||
if (searchHitsSize > 0) {
|
||||
int nDocs = randomIntBetween(0, searchHitsSize);
|
||||
|
@ -256,7 +252,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
|
||||
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
||||
float maxScore = -1F;
|
||||
SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex);
|
||||
SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex, null);
|
||||
FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
|
||||
List<SearchHit> searchHits = new ArrayList<>();
|
||||
for (ScoreDoc scoreDoc : mergedSearchDocs) {
|
||||
|
@ -293,7 +289,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
|
||||
request.setBatchedReduceSize(bufferSize);
|
||||
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
|
||||
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0));
|
||||
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0, null));
|
||||
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
|
||||
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
|
||||
Collections.emptyList(), Collections.emptyMap())));
|
||||
|
@ -301,7 +297,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
result.setShardIndex(0);
|
||||
consumer.consumeResult(result);
|
||||
|
||||
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0));
|
||||
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null));
|
||||
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
|
||||
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
|
||||
Collections.emptyList(), Collections.emptyMap())));
|
||||
|
@ -309,7 +305,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
result.setShardIndex(2);
|
||||
consumer.consumeResult(result);
|
||||
|
||||
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0));
|
||||
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null));
|
||||
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
|
||||
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
|
||||
Collections.emptyList(), Collections.emptyMap())));
|
||||
|
@ -348,7 +344,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
threads[i] = new Thread(() -> {
|
||||
int number = randomIntBetween(1, 1000);
|
||||
max.updateAndGet(prev -> Math.max(prev, number));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id, null));
|
||||
result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
|
||||
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
|
||||
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
|
||||
|
@ -385,7 +381,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
int id = i;
|
||||
int number = randomIntBetween(1, 1000);
|
||||
max.updateAndGet(prev -> Math.max(prev, number));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id, null));
|
||||
result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]);
|
||||
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
|
||||
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
|
||||
|
@ -418,7 +414,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
int id = i;
|
||||
int number = randomIntBetween(1, 1000);
|
||||
max.updateAndGet(prev -> Math.max(prev, number));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id));
|
||||
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id, null));
|
||||
result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]);
|
||||
result.setShardIndex(id);
|
||||
result.size(1);
|
||||
|
@ -474,7 +470,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
searchPhaseController.newSearchPhaseResults(request, 4);
|
||||
int score = 100;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i));
|
||||
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i, null));
|
||||
ScoreDoc[] docs = new ScoreDoc[3];
|
||||
for (int j = 0; j < docs.length; j++) {
|
||||
docs[j] = new ScoreDoc(0, score--);
|
||||
|
|
|
@ -49,11 +49,11 @@ public class SearchPhaseExecutionExceptionTests extends ESTestCase {
|
|||
SearchPhaseExecutionException exception = new SearchPhaseExecutionException("test", "all shards failed",
|
||||
new ShardSearchFailure[]{
|
||||
new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 0)),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 0, null)),
|
||||
new ShardSearchFailure(new IndexShardClosedException(new ShardId(new Index("foo", "_na_"), 1)),
|
||||
new SearchShardTarget("node_2", new Index("foo", "_na_"), 1)),
|
||||
new SearchShardTarget("node_2", new Index("foo", "_na_"), 1, null)),
|
||||
new ShardSearchFailure(new ParsingException(5, 7, "foobar", null),
|
||||
new SearchShardTarget("node_3", new Index("foo", "_na_"), 2)),
|
||||
new SearchShardTarget("node_3", new Index("foo", "_na_"), 2, null)),
|
||||
});
|
||||
|
||||
// Failures are grouped (by default)
|
||||
|
@ -150,7 +150,7 @@ public class SearchPhaseExecutionExceptionTests extends ESTestCase {
|
|||
new TimestampParsingException("foo", null),
|
||||
new NullPointerException()
|
||||
);
|
||||
shardSearchFailures[i] = new ShardSearchFailure(cause, new SearchShardTarget("node_" + i, new Index("test", "_na_"), i));
|
||||
shardSearchFailures[i] = new ShardSearchFailure(cause, new SearchShardTarget("node_" + i, new Index("test", "_na_"), i, null));
|
||||
}
|
||||
|
||||
final String phase = randomFrom("query", "search", "other");
|
||||
|
|
|
@ -71,7 +71,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
|
|||
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
|
||||
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
|
||||
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
|
||||
new Index("test", "_na_"), 1));
|
||||
new Index("test", "_na_"), 1, null));
|
||||
searchActionListener.onResponse(testSearchPhaseResult);
|
||||
}).start();
|
||||
}
|
||||
|
@ -162,7 +162,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
|
|||
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
|
||||
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
|
||||
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
|
||||
new Index("test", "_na_"), 1));
|
||||
new Index("test", "_na_"), 1, null));
|
||||
searchActionListener.onResponse(testSearchPhaseResult);
|
||||
}).start();
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
|
|||
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
|
||||
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
|
||||
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
|
||||
new Index("test", "_na_"), 1));
|
||||
new Index("test", "_na_"), 1, null));
|
||||
searchActionListener.onResponse(testSearchPhaseResult);
|
||||
}).start();
|
||||
}
|
||||
|
@ -312,7 +312,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
|
|||
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
|
||||
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
|
||||
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
|
||||
new Index("test", "_na_"), 1));
|
||||
new Index("test", "_na_"), 1, null));
|
||||
searchActionListener.onResponse(testSearchPhaseResult);
|
||||
}
|
||||
}).start();
|
||||
|
|
|
@ -198,7 +198,8 @@ public class TransportSearchActionTests extends ESTestCase {
|
|||
assertArrayEquals(new String[]{"some_alias_for_foo", "some_other_foo_alias"},
|
||||
iterator.getOriginalIndices().indices());
|
||||
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
|
||||
assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName());
|
||||
assertEquals("test_cluster_1", iterator.getClusterAlias());
|
||||
assertEquals("foo", iterator.shardId().getIndexName());
|
||||
ShardRouting shardRouting = iterator.nextOrNull();
|
||||
assertNotNull(shardRouting);
|
||||
assertEquals(shardRouting.getIndexName(), "foo");
|
||||
|
@ -209,7 +210,8 @@ public class TransportSearchActionTests extends ESTestCase {
|
|||
} else if (iterator.shardId().getIndexName().endsWith("bar")) {
|
||||
assertArrayEquals(new String[]{"bar"}, iterator.getOriginalIndices().indices());
|
||||
assertEquals(0, iterator.shardId().getId());
|
||||
assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName());
|
||||
assertEquals("test_cluster_1", iterator.getClusterAlias());
|
||||
assertEquals("bar", iterator.shardId().getIndexName());
|
||||
ShardRouting shardRouting = iterator.nextOrNull();
|
||||
assertNotNull(shardRouting);
|
||||
assertEquals(shardRouting.getIndexName(), "bar");
|
||||
|
@ -220,7 +222,8 @@ public class TransportSearchActionTests extends ESTestCase {
|
|||
} else if (iterator.shardId().getIndexName().endsWith("xyz")) {
|
||||
assertArrayEquals(new String[]{"some_alias_for_xyz"}, iterator.getOriginalIndices().indices());
|
||||
assertEquals(0, iterator.shardId().getId());
|
||||
assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName());
|
||||
assertEquals("xyz", iterator.shardId().getIndexName());
|
||||
assertEquals("test_cluster_2", iterator.getClusterAlias());
|
||||
ShardRouting shardRouting = iterator.nextOrNull();
|
||||
assertNotNull(shardRouting);
|
||||
assertEquals(shardRouting.getIndexName(), "xyz");
|
||||
|
|
|
@ -127,6 +127,11 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
|||
@Override
|
||||
public void rewrite(QueryShardContext context) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterAlias() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public ShardSearchRequest request() {
|
||||
|
|
|
@ -151,9 +151,9 @@ public class BytesRestResponseTests extends ESTestCase {
|
|||
RestRequest request = new FakeRestRequest();
|
||||
RestChannel channel = new DetailedExceptionRestChannel(request);
|
||||
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 1, null));
|
||||
ShardSearchFailure failure1 = new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2));
|
||||
new SearchShardTarget("node_1", new Index("foo", "_na_"), 2, null));
|
||||
SearchPhaseExecutionException ex = new SearchPhaseExecutionException("search", "all shards failed", new ShardSearchFailure[] {failure, failure1});
|
||||
BytesRestResponse response = new BytesRestResponse(channel, new RemoteTransportException("foo", ex));
|
||||
String text = response.content().utf8ToString();
|
||||
|
|
|
@ -207,7 +207,8 @@ public class SearchHitTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSerializeShardTarget() throws Exception {
|
||||
SearchShardTarget target = new SearchShardTarget("_node_id", new Index("_index", "_na_"), 0);
|
||||
String clusterAlias = randomBoolean() ? null : "cluster_alias";
|
||||
SearchShardTarget target = new SearchShardTarget("_node_id", new Index("_index", "_na_"), 0, clusterAlias);
|
||||
|
||||
Map<String, SearchHits> innerHits = new HashMap<>();
|
||||
SearchHit innerHit1 = new SearchHit(0, "_id", new Text("_type"), null);
|
||||
|
@ -233,6 +234,7 @@ public class SearchHitTests extends ESTestCase {
|
|||
|
||||
SearchHits hits = new SearchHits(new SearchHit[]{hit1, hit2}, 2, 1f);
|
||||
|
||||
|
||||
BytesStreamOutput output = new BytesStreamOutput();
|
||||
hits.writeTo(output);
|
||||
InputStream input = output.bytes().streamInput();
|
||||
|
@ -242,6 +244,17 @@ public class SearchHitTests extends ESTestCase {
|
|||
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue());
|
||||
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).getShard(), notNullValue());
|
||||
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).getShard(), notNullValue());
|
||||
for (SearchHit hit : results) {
|
||||
assertEquals(clusterAlias, hit.getClusterAlias());
|
||||
if (hit.getInnerHits() != null) {
|
||||
for (SearchHits innerhits : hit.getInnerHits().values()) {
|
||||
for (SearchHit innerHit : innerhits) {
|
||||
assertEquals(clusterAlias, innerHit.getClusterAlias());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(results.getAt(1).getShard(), equalTo(target));
|
||||
}
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public SearchShardTarget shardTarget() {
|
||||
return new SearchShardTarget("no node, this is a unit test", new Index("no index, this is a unit test", "_na_"), 0);
|
||||
return new SearchShardTarget("no node, this is a unit test", new Index("no index, this is a unit test", "_na_"), 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
|||
filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY);
|
||||
}
|
||||
return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
|
||||
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()));
|
||||
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), null);
|
||||
}
|
||||
|
||||
public void testFilteringAliases() throws Exception {
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
---
|
||||
"Test that remote index names are preserved in top hits":
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: single_doc_index
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- '{"index": {"_index": "single_doc_index", "_type": "test_type"}}'
|
||||
- '{"f1": "local_cluster", "sort_field": 0}'
|
||||
- do:
|
||||
search:
|
||||
index: "single_doc_index,my_remote_cluster:single_doc_index"
|
||||
body:
|
||||
sort: "sort_field"
|
||||
aggs:
|
||||
cluster:
|
||||
top_hits:
|
||||
size: 2
|
||||
sort: "sort_field"
|
||||
|
||||
- match: { _shards.total: 2 }
|
||||
- match: { hits.total: 2 }
|
||||
- match: { hits.hits.0._index: "single_doc_index"}
|
||||
- match: { hits.hits.1._index: "my_remote_cluster:single_doc_index"}
|
||||
|
||||
- length: { aggregations.cluster.hits.hits: 2 }
|
||||
- match: { aggregations.cluster.hits.hits.0._index: "single_doc_index" }
|
||||
- match: { aggregations.cluster.hits.hits.0._source.f1: "local_cluster" }
|
||||
- match: { aggregations.cluster.hits.hits.1._index: "my_remote_cluster:single_doc_index" }
|
||||
- match: { aggregations.cluster.hits.hits.1._source.f1: "remote_cluster" }
|
|
@ -1,6 +1,22 @@
|
|||
---
|
||||
"Index data and search on the old cluster":
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: single_doc_index
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- '{"index": {"_index": "single_doc_index", "_type": "test_type"}}'
|
||||
- '{"f1": "remote_cluster", "sort_field": 1}'
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: field_caps_index_1
|
||||
|
|
|
@ -45,7 +45,7 @@ public class MockSearchServiceTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public SearchShardTarget shardTarget() {
|
||||
return new SearchShardTarget("node", new Index("idx", "ignored"), 0);
|
||||
return new SearchShardTarget("node", new Index("idx", "ignored"), 0, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue