diff --git a/src/main/java/org/elasticsearch/action/IndicesRequest.java b/src/main/java/org/elasticsearch/action/IndicesRequest.java index ea875318f23..b9985225620 100644 --- a/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.support.IndicesOptions; /** * Needs to be implemented by all {@link org.elasticsearch.action.ActionRequest} subclasses that relate to * one or more indices. Allows to retrieve which indices the action relates to. + * In case of internal requests originated during the distributed execution of an external request, + * they will still return the indices that the original request related to. */ public interface IndicesRequest { diff --git a/src/main/java/org/elasticsearch/action/OriginalIndices.java b/src/main/java/org/elasticsearch/action/OriginalIndices.java new file mode 100644 index 00000000000..bc09300630d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/OriginalIndices.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Used to keep track of original indices within internal (e.g. shard level) requests + */ +public class OriginalIndices implements IndicesRequest { + + public static OriginalIndices EMPTY = new OriginalIndices(); + + private final String[] indices; + private final IndicesOptions indicesOptions; + + private OriginalIndices() { + this.indices = null; + this.indicesOptions = null; + } + + public OriginalIndices(IndicesRequest indicesRequest) { + this.indices = indicesRequest.indices(); + this.indicesOptions = indicesRequest.indicesOptions(); + } + + public OriginalIndices(String[] indices, IndicesOptions indicesOptions) { + this.indices = indices; + this.indicesOptions = indicesOptions; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public static OriginalIndices readOptionalOriginalIndices(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + boolean empty = in.readBoolean(); + if (!empty) { + return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in)); + } + } + return OriginalIndices.EMPTY; + } + + public static void writeOptionalOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + boolean empty = originalIndices == EMPTY; + out.writeBoolean(empty); + if (!empty) { + out.writeStringArrayNullable(originalIndices.indices); + originalIndices.indicesOptions.writeIndicesOptions(out); + } + } + } + + public static OriginalIndices readOriginalIndices(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in)); + } + return OriginalIndices.EMPTY; + } + + + public static void writeOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + out.writeStringArrayNullable(originalIndices.indices); + originalIndices.indicesOptions.writeIndicesOptions(out); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java index 98df1dc0c71..b8a14801199 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsIndexRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.admin.indices.mapping.get; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -33,10 +35,13 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest indices = new ArrayList<>(); + for (BulkItemRequest item : items) { + if (item != null) { + indices.add(item.index()); + } + } + return indices.toArray(new String[indices.size()]); + } + /** * Before we fork on a local thread, make sure we copy over the bytes if they are unsafe */ diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a541cab2c13..b47c9694d0e 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -296,7 +296,7 @@ public class TransportBulkAction extends HandledTransportAction> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()])); + BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()])); bulkShardRequest.replicationType(bulkRequest.replicationType()); bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); bulkShardRequest.timeout(bulkRequest.timeout()); diff --git a/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index 080a830f801..a52d53360c3 100644 --- a/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -170,7 +170,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction routing, @Nullable String[] filteringAliases, long nowInMillis) { - super(index, request.timeout(), request.replicationType(), request.consistencyLevel()); + super(index, request.timeout(), request.replicationType(), request.consistencyLevel(), request.indices(), request.indicesOptions(), request); this.source = request.source(); this.types = request.types(); this.routing = routing; diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java index 479f7e35601..9215cf765b6 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java @@ -21,6 +21,8 @@ package org.elasticsearch.action.deletebyquery; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -50,6 +52,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< private String[] filteringAliases; private long nowInMillis; + private OriginalIndices originalIndices; + ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) { super(request); this.index = request.index(); @@ -62,6 +66,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< this.routing = request.routing(); filteringAliases = request.filteringAliases(); nowInMillis = request.nowInMillis(); + this.originalIndices = new OriginalIndices(request); } ShardDeleteByQueryRequest() { @@ -100,6 +105,16 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< return nowInMillis; } + @Override + public String[] indices() { + return originalIndices.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + return originalIndices.indicesOptions(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -126,6 +141,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< } else { nowInMillis = System.currentTimeMillis(); } + originalIndices = OriginalIndices.readOriginalIndices(in); } @Override @@ -153,6 +169,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< if (out.getVersion().onOrAfter(Version.V_1_2_0)) { out.writeVLong(nowInMillis); } + OriginalIndices.writeOriginalIndices(originalIndices, out); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 408e722524b..efdf5c40efa 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -108,7 +108,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest.request).types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher(DELETE_BY_QUERY_API), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { @@ -130,7 +130,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest).types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher(DELETE_BY_QUERY_API, IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { diff --git a/src/main/java/org/elasticsearch/action/exists/TransportExistsAction.java b/src/main/java/org/elasticsearch/action/exists/TransportExistsAction.java index 9b2d3dd2338..a8fd65671ac 100644 --- a/src/main/java/org/elasticsearch/action/exists/TransportExistsAction.java +++ b/src/main/java/org/elasticsearch/action/exists/TransportExistsAction.java @@ -171,7 +171,7 @@ public class TransportExistsAction extends TransportBroadcastOperationAction { this.type = "_all"; } + /** + * Constructs a new get request starting from the provided request, meaning that it will + * inherit its headers and context, and against the specified index. + */ + public GetRequest(ActionRequest request, String index) { + super(request, index); + } + /** * Constructs a new get request against the specified index with the type and id. * diff --git a/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index cb1853600b8..62036e1f042 100644 --- a/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.LongArrayList; import org.elasticsearch.Version; import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; @@ -43,27 +42,21 @@ public class MultiGetShardRequest extends SingleShardOperationRequest types; - List ids; - List fields; - LongArrayList versions; - List versionTypes; - List fetchSourceContexts; + List items; MultiGetShardRequest() { } - MultiGetShardRequest(String index, int shardId) { - super(index); + MultiGetShardRequest(MultiGetRequest multiGetRequest, String index, int shardId) { + super(multiGetRequest, index); this.shardId = shardId; locations = new IntArrayList(); - types = new ArrayList<>(); - ids = new ArrayList<>(); - fields = new ArrayList<>(); - versions = new LongArrayList(); - versionTypes = new ArrayList<>(); - fetchSourceContexts = new ArrayList<>(); + items = new ArrayList<>(); + preference = multiGetRequest.preference; + realtime = multiGetRequest.realtime; + refresh = multiGetRequest.refresh; + ignoreErrorsOnGeneratedFields = multiGetRequest.ignoreErrorsOnGeneratedFields; } public int shardId() { @@ -107,14 +100,18 @@ public class MultiGetShardRequest extends SingleShardOperationRequest(size); - ids = new ArrayList<>(size); - fields = new ArrayList<>(size); - versions = new LongArrayList(size); - versionTypes = new ArrayList<>(size); - fetchSourceContexts = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - locations.add(in.readVInt()); - if (in.readBoolean()) { - types.add(in.readSharedString()); - } else { - types.add(null); - } - ids.add(in.readString()); - int size1 = in.readVInt(); - if (size1 > 0) { - String[] fields = new String[size1]; - for (int j = 0; j < size1; j++) { - fields[j] = in.readString(); - } - this.fields.add(fields); - } else { - fields.add(null); - } - versions.add(Versions.readVersionWithVLongForBW(in)); - versionTypes.add(VersionType.fromValue(in.readByte())); + items = new ArrayList<>(size); - fetchSourceContexts.add(FetchSourceContext.optionalReadFromStream(in)); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + for (int i = 0; i < size; i++) { + locations.add(in.readVInt()); + items.add(MultiGetRequest.Item.readItem(in)); + } + } else { + List types = new ArrayList<>(size); + List ids = new ArrayList<>(size); + List fields = new ArrayList<>(size); + LongArrayList versions = new LongArrayList(size); + List versionTypes = new ArrayList<>(size); + List fetchSourceContexts = new ArrayList<>(size); + + for (int i = 0; i < size; i++) { + locations.add(in.readVInt()); + if (in.readBoolean()) { + types.add(in.readSharedString()); + } else { + types.add(null); + } + ids.add(in.readString()); + int size1 = in.readVInt(); + if (size1 > 0) { + String[] fieldsArray = new String[size1]; + for (int j = 0; j < size1; j++) { + fieldsArray[j] = in.readString(); + } + fields.add(fieldsArray); + } else { + fields.add(null); + } + versions.add(Versions.readVersionWithVLongForBW(in)); + versionTypes.add(VersionType.fromValue(in.readByte())); + + fetchSourceContexts.add(FetchSourceContext.optionalReadFromStream(in)); + } + + for (int i = 0; i < size; i++) { + //before 1.4 we have only one index, the concrete one + MultiGetRequest.Item item = new MultiGetRequest.Item(index, types.get(i), ids.get(i)) + .fields(fields.get(i)).version(versions.get(i)).versionType(versionTypes.get(i)) + .fetchSourceContext(fetchSourceContexts.get(i)); + items.add(item); + } } preference = in.readOptionalString(); @@ -168,35 +183,43 @@ public class MultiGetShardRequest extends SingleShardOperationRequest items; - public Request() { + Request() { } - public Request(String concreteIndex, int shardId, String preference) { + Request(String concreteIndex, int shardId, String preference) { this.index = concreteIndex; this.shardId = shardId; this.preference = preference; @@ -133,7 +135,11 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper @Override public String[] indices() { - return new String[]{index}; + List indices = new ArrayList<>(); + for (Item item : items) { + Collections.addAll(indices, item.request.indices()); + } + return indices.toArray(new String[indices.size()]); } public int shardId() { @@ -157,7 +163,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper items = new ArrayList<>(size); for (int i = 0; i < size; i++) { int slot = in.readVInt(); - PercolateShardRequest shardRequest = new PercolateShardRequest(new ShardId(index, shardId)); + OriginalIndices originalIndices = OriginalIndices.readOriginalIndices(in); + PercolateShardRequest shardRequest = new PercolateShardRequest(new ShardId(index, shardId), originalIndices); shardRequest.documentType(in.readString()); shardRequest.source(in.readBytesReference()); shardRequest.docSource(in.readBytesReference()); @@ -175,6 +182,7 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper out.writeVInt(items.size()); for (Item item : items) { out.writeVInt(item.slot); + OriginalIndices.writeOriginalIndices(item.request.originalIndices(), out); out.writeString(item.request.documentType()); out.writeBytesReference(item.request.source()); out.writeBytesReference(item.request.docSource()); @@ -182,7 +190,7 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper } } - public static class Item { + static class Item { private final int slot; private final PercolateShardRequest request; diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 3410786fb85..50cf434ce32 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -94,6 +94,14 @@ public class SearchRequest extends ActionRequest implements Indic public SearchRequest() { } + /** + * Constructs a new search request starting from the provided request, meaning that it will + * inherit its headers and context + */ + public SearchRequest(ActionRequest request) { + super(request); + } + /** * Constructs a new search request against the indices. No indices provided here means that search * will run against all indices. diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 022bf11af28..68d796eb6e0 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.fetch.FetchSearchRequest; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; -import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java index 3c88baffbd1..f6c917fbf9b 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.broadcast; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,41 +36,47 @@ public abstract class BroadcastShardOperationRequest extends TransportRequest im private ShardId shardId; + protected OriginalIndices originalIndices; + protected BroadcastShardOperationRequest() { } protected BroadcastShardOperationRequest(ShardId shardId, BroadcastOperationRequest request) { super(request); this.shardId = shardId; + this.originalIndices = new OriginalIndices(request); } - protected BroadcastShardOperationRequest(ShardId shardId) { + protected BroadcastShardOperationRequest(ShardId shardId, OriginalIndices originalIndices) { this.shardId = shardId; - } - - @Override - public String[] indices() { - return new String[]{shardId.getIndex()}; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + this.originalIndices = originalIndices; } public ShardId shardId() { return this.shardId; } + @Override + public String[] indices() { + return originalIndices.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + return originalIndices.indicesOptions(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); + originalIndices = OriginalIndices.readOriginalIndices(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); } } diff --git a/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java index 27386f9bbaa..9f13ccf629e 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java @@ -19,10 +19,7 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.*; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -41,12 +38,16 @@ public abstract class IndexReplicationOperationRequest(); @@ -71,6 +71,15 @@ public class MultiTermVectorsShardRequest extends SingleShardOperationRequest indices = new ArrayList<>(); + private Client nodeClient; + + @Override + protected int minimumNumberOfShards() { + //makes sure that a reduce is always needed when searching + return 2; + } + + @Override + protected int minimumNumberOfReplicas() { + //makes sure that write operations get sent to the replica as well + //so we are able to intercept those messages and check them + return 1; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.settingsBuilder() + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InterceptingTransportService.class.getName()) + .put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Before + public void setup() { + //make sure there is a node client around before each test starts + nodeClient = internalCluster().clientNodeClient(); + int numIndices = iterations(1, 5); + for (int i = 0; i < numIndices; i++) { + indices.add("test" + i); + } + for (String index : indices) { + assertAcked(prepareCreate(index).addAlias(new Alias(index + "-alias"))); + } + ensureGreen(); + } + + @After + public void cleanUp() { + assertAllRequestsHaveBeenConsumed(); + clearInterceptedActions(); + indices.clear(); + } + + @Test + public void testGetFieldMappings() { + String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]"; + interceptTransportActions(getFieldMappingsShardAction); + + GetFieldMappingsRequest getFieldMappingsRequest = new GetFieldMappingsRequest(); + getFieldMappingsRequest.indices(randomIndicesOrAliases()); + nodeClient.admin().indices().getFieldMappings(getFieldMappingsRequest).actionGet(); + + assertSameIndices(getFieldMappingsRequest, getFieldMappingsShardAction); + } + + @Test + public void testAnalyze() { + String analyzeShardAction = AnalyzeAction.NAME + "[s]"; + interceptTransportActions(analyzeShardAction); + + AnalyzeRequest analyzeRequest = new AnalyzeRequest(randomIndexOrAlias(), "text"); + nodeClient.admin().indices().analyze(analyzeRequest).actionGet(); + + assertSameIndices(analyzeRequest, analyzeShardAction); + } + + @Test + public void testIndex() { + String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[r]"}; + interceptTransportActions(indexShardActions); + + IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value"); + nodeClient.index(indexRequest).actionGet(); + + assertSameIndices(indexRequest, indexShardActions); + } + + @Test + public void testDelete() { + String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[r]"}; + interceptTransportActions(deleteShardActions); + + DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id"); + nodeClient.delete(deleteRequest).actionGet(); + + assertSameIndices(deleteRequest, deleteShardActions); + } + + @Test + public void testUpdate() { + //update action goes to the primary, index op gets executed locally, then replicated + String[] updateShardActions = new String[]{UpdateAction.NAME, IndexAction.NAME + "[r]"}; + interceptTransportActions(updateShardActions); + + String indexOrAlias = randomIndexOrAlias(); + client().prepareIndex(indexOrAlias, "type", "id").setSource("field", "value").get(); + UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").doc("field1", "value1"); + UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet(); + assertThat(updateResponse.isCreated(), equalTo(false)); + + assertSameIndices(updateRequest, updateShardActions); + } + + @Test + public void testUpdateUpsert() { + //update action goes to the primary, index op gets executed locally, then replicated + String[] updateShardActions = new String[]{UpdateAction.NAME, IndexAction.NAME + "[r]"}; + interceptTransportActions(updateShardActions); + + String indexOrAlias = randomIndexOrAlias(); + UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").upsert("field", "value").doc("field1", "value1"); + UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet(); + assertThat(updateResponse.isCreated(), equalTo(true)); + + assertSameIndices(updateRequest, updateShardActions); + } + + @Test + public void testUpdateDelete() { + //update action goes to the primary, delete op gets executed locally, then replicated + String[] updateShardActions = new String[]{UpdateAction.NAME, DeleteAction.NAME + "[r]"}; + interceptTransportActions(updateShardActions); + + String indexOrAlias = randomIndexOrAlias(); + client().prepareIndex(indexOrAlias, "type", "id").setSource("field", "value").get(); + UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").script("ctx.op='delete'"); + UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet(); + assertThat(updateResponse.isCreated(), equalTo(false)); + + assertSameIndices(updateRequest, updateShardActions); + } + + @Test + public void testDeleteByQuery() { + String[] deleteByQueryShardActions = new String[]{DeleteByQueryAction.NAME + "[s]", DeleteByQueryAction.NAME + "[s][r]"}; + interceptTransportActions(deleteByQueryShardActions); + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(randomIndicesOrAliases()).source(new QuerySourceBuilder().setQuery(QueryBuilders.matchAllQuery())); + nodeClient.deleteByQuery(deleteByQueryRequest).actionGet(); + + assertSameIndices(deleteByQueryRequest, deleteByQueryShardActions); + } + + @Test + public void testBulk() { + String[] bulkShardActions = new String[]{BulkAction.NAME + "[s]", BulkAction.NAME + "[s][r]"}; + interceptTransportActions(bulkShardActions); + + List indices = new ArrayList<>(); + BulkRequest bulkRequest = new BulkRequest(); + int numIndexRequests = iterations(1, 10); + for (int i = 0; i < numIndexRequests; i++) { + String indexOrAlias = randomIndexOrAlias(); + bulkRequest.add(new IndexRequest(indexOrAlias, "type", "id").source("field", "value")); + indices.add(indexOrAlias); + } + int numDeleteRequests = iterations(1, 10); + for (int i = 0; i < numDeleteRequests; i++) { + String indexOrAlias = randomIndexOrAlias(); + bulkRequest.add(new DeleteRequest(indexOrAlias, "type", "id")); + indices.add(indexOrAlias); + } + int numUpdateRequests = iterations(1, 10); + for (int i = 0; i < numUpdateRequests; i++) { + String indexOrAlias = randomIndexOrAlias(); + bulkRequest.add(new UpdateRequest(indexOrAlias, "type", "id").doc("field1", "value1")); + indices.add(indexOrAlias); + } + + nodeClient.bulk(bulkRequest).actionGet(); + + assertIndicesSubset(indices, bulkShardActions); + } + + @Test + public void testGet() { + String getShardAction = GetAction.NAME + "[s]"; + interceptTransportActions(getShardAction); + + GetRequest getRequest = new GetRequest(randomIndexOrAlias(), "type", "id"); + nodeClient.get(getRequest).actionGet(); + + assertSameIndices(getRequest, getShardAction); + } + + @Test + public void testExplain() { + String explainShardAction = ExplainAction.NAME + "[s]"; + interceptTransportActions(explainShardAction); + + ExplainRequest explainRequest = new ExplainRequest(randomIndexOrAlias(), "type", "id").source(new QuerySourceBuilder().setQuery(QueryBuilders.matchAllQuery())); + nodeClient.explain(explainRequest).actionGet(); + + assertSameIndices(explainRequest, explainShardAction); + } + + @Test + public void testTermVector() { + String termVectorShardAction = TermVectorAction.NAME + "[s]"; + interceptTransportActions(termVectorShardAction); + + TermVectorRequest termVectorRequest = new TermVectorRequest(randomIndexOrAlias(), "type", "id"); + nodeClient.termVector(termVectorRequest).actionGet(); + + assertSameIndices(termVectorRequest, termVectorShardAction); + } + + @Test + public void testMultiTermVector() { + String multiTermVectorsShardAction = MultiTermVectorsAction.NAME + "[shard][s]"; + interceptTransportActions(multiTermVectorsShardAction); + + List indices = new ArrayList<>(); + MultiTermVectorsRequest multiTermVectorsRequest = new MultiTermVectorsRequest(); + int numDocs = iterations(1, 30); + for (int i = 0; i < numDocs; i++) { + String indexOrAlias = randomIndexOrAlias(); + multiTermVectorsRequest.add(indexOrAlias, "type", Integer.toString(i)); + indices.add(indexOrAlias); + } + nodeClient.multiTermVectors(multiTermVectorsRequest).actionGet(); + + assertIndicesSubset(indices, multiTermVectorsShardAction); + } + + @Test + public void testMultiGet() { + String multiGetShardAction = MultiGetAction.NAME + "[shard][s]"; + interceptTransportActions(multiGetShardAction); + + List indices = new ArrayList<>(); + MultiGetRequest multiGetRequest = new MultiGetRequest(); + int numDocs = iterations(1, 30); + for (int i = 0; i < numDocs; i++) { + String indexOrAlias = randomIndexOrAlias(); + multiGetRequest.add(indexOrAlias, "type", Integer.toString(i)); + indices.add(indexOrAlias); + } + nodeClient.multiGet(multiGetRequest).actionGet(); + + assertIndicesSubset(indices, multiGetShardAction); + } + + @Test + public void testCount() { + String countShardAction = CountAction.NAME + "[s]"; + interceptTransportActions(countShardAction); + + CountRequest countRequest = new CountRequest(randomIndicesOrAliases()); + nodeClient.count(countRequest).actionGet(); + + assertSameIndices(countRequest, countShardAction); + } + + @Test + public void testExists() { + String existsShardAction = ExistsAction.NAME + "[s]"; + interceptTransportActions(existsShardAction); + + ExistsRequest existsRequest = new ExistsRequest(randomIndicesOrAliases()); + nodeClient.exists(existsRequest).actionGet(); + + assertSameIndices(existsRequest, existsShardAction); + } + + @Test + public void testFlush() { + String flushShardAction = FlushAction.NAME + "[s]"; + interceptTransportActions(flushShardAction); + + FlushRequest flushRequest = new FlushRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().flush(flushRequest).actionGet(); + + assertSameIndices(flushRequest, flushShardAction); + } + + @Test + public void testOptimize() { + String optimizeShardAction = OptimizeAction.NAME + "[s]"; + interceptTransportActions(optimizeShardAction); + + OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().optimize(optimizeRequest).actionGet(); + + assertSameIndices(optimizeRequest, optimizeShardAction); + } + + @Test + public void testRefresh() { + String refreshShardAction = RefreshAction.NAME + "[s]"; + interceptTransportActions(refreshShardAction); + + RefreshRequest refreshRequest = new RefreshRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().refresh(refreshRequest).actionGet(); + + assertSameIndices(refreshRequest, refreshShardAction); + } + + @Test + public void testClearCache() { + String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]"; + interceptTransportActions(clearCacheAction); + + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + assertSameIndices(clearIndicesCacheRequest, clearCacheAction); + } + + @Test + public void testRecovery() { + String recoveryAction = RecoveryAction.NAME + "[s]"; + interceptTransportActions(recoveryAction); + + RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().recoveries(recoveryRequest).actionGet(); + + assertSameIndices(recoveryRequest, recoveryAction); + } + + @Test + public void testSegments() { + String segmentsAction = IndicesSegmentsAction.NAME + "[s]"; + interceptTransportActions(segmentsAction); + + IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().segments(segmentsRequest).actionGet(); + + assertSameIndices(segmentsRequest, segmentsAction); + } + + @Test + public void testIndicesStats() { + String indicesStats = IndicesStatsAction.NAME + "[s]"; + interceptTransportActions(indicesStats); + + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases()); + nodeClient.admin().indices().stats(indicesStatsRequest).actionGet(); + + assertSameIndices(indicesStatsRequest, indicesStats); + } + + @Test + public void testSuggest() { + String suggestAction = SuggestAction.NAME + "[s]"; + interceptTransportActions(suggestAction); + + SuggestRequest suggestRequest = new SuggestRequest(randomIndicesOrAliases()); + nodeClient.suggest(suggestRequest).actionGet(); + + assertSameIndices(suggestRequest, suggestAction); + } + + @Test + public void testValidateQuery() { + String validateQueryShardAction = ValidateQueryAction.NAME + "[s]"; + interceptTransportActions(validateQueryShardAction); + + ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(randomIndicesOrAliases()); + nodeClient.admin().indices().validateQuery(validateQueryRequest).actionGet(); + + assertSameIndices(validateQueryRequest, validateQueryShardAction); + } + + @Test + public void testPercolate() { + String percolateShardAction = PercolateAction.NAME + "[s]"; + interceptTransportActions(percolateShardAction); + + client().prepareIndex("test-get", "type", "1").setSource("field","value").get(); + + PercolateRequest percolateRequest = new PercolateRequest().indices(randomIndicesOrAliases()).documentType("type"); + if (randomBoolean()) { + percolateRequest.getRequest(new GetRequest("test-get", "type", "1")); + } else { + percolateRequest.source("\"field\":\"value\""); + } + nodeClient.percolate(percolateRequest).actionGet(); + + assertSameIndices(percolateRequest, percolateShardAction); + } + + @Test + public void testMultiPercolate() { + String multiPercolateShardAction = MultiPercolateAction.NAME + "[shard][s]"; + interceptTransportActions(multiPercolateShardAction); + + client().prepareIndex("test-get", "type", "1").setSource("field", "value").get(); + + MultiPercolateRequest multiPercolateRequest = new MultiPercolateRequest(); + List indices = new ArrayList<>(); + int numRequests = iterations(1, 30); + for (int i = 0; i < numRequests; i++) { + String[] indicesOrAliases = randomIndicesOrAliases(); + Collections.addAll(indices, indicesOrAliases); + PercolateRequest percolateRequest = new PercolateRequest().indices(indicesOrAliases).documentType("type"); + if (randomBoolean()) { + percolateRequest.getRequest(new GetRequest("test-get", "type", "1")); + } else { + percolateRequest.source("\"field\":\"value\""); + } + multiPercolateRequest.add(percolateRequest); + } + + nodeClient.multiPercolate(multiPercolateRequest).actionGet(); + + assertIndicesSubset(indices, multiPercolateShardAction); + } + + @Test + public void testOpenIndex() { + interceptTransportActions(OpenIndexAction.NAME); + + OpenIndexRequest openIndexRequest = new OpenIndexRequest(randomUniqueIndicesOrAliases()); + nodeClient.admin().indices().open(openIndexRequest).actionGet(); + + assertSameIndices(openIndexRequest, OpenIndexAction.NAME); + } + + @Test + public void testCloseIndex() { + interceptTransportActions(CloseIndexAction.NAME); + + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(randomUniqueIndicesOrAliases()); + nodeClient.admin().indices().close(closeIndexRequest).actionGet(); + + assertSameIndices(closeIndexRequest, CloseIndexAction.NAME); + } + + @Test + public void testDeleteIndex() { + interceptTransportActions(DeleteIndexAction.NAME); + + String[] randomIndicesOrAliases = randomUniqueIndicesOrAliases(); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(randomIndicesOrAliases); + assertAcked(nodeClient.admin().indices().delete(deleteIndexRequest).actionGet()); + + assertSameIndices(deleteIndexRequest, DeleteIndexAction.NAME); + + //explicitly cleanup otherwise the delete index after test gets intercepted too and assertAllRequestsHaveBeenConsumed fails + clearInterceptedActions(); + } + + @Test + public void testGetMappings() { + interceptTransportActions(GetMappingsAction.NAME); + + GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases()); + nodeClient.admin().indices().getMappings(getMappingsRequest).actionGet(); + + assertSameIndices(getMappingsRequest, GetMappingsAction.NAME); + } + + @Test + public void testPutMapping() { + interceptTransportActions(PutMappingAction.NAME); + + PutMappingRequest putMappingRequest = new PutMappingRequest(randomUniqueIndicesOrAliases()).type("type").source("field", "type=string"); + nodeClient.admin().indices().putMapping(putMappingRequest).actionGet(); + + assertSameIndices(putMappingRequest, PutMappingAction.NAME); + } + + @Test + public void testDeleteMapping() { + interceptTransportActions(DeleteMappingAction.NAME); + + String[] indices = randomUniqueIndicesOrAliases(); + client().admin().indices().putMapping(new PutMappingRequest(indices).type("type").source("field", "type=string")).actionGet(); + DeleteMappingRequest deleteMappingRequest = new DeleteMappingRequest(indices).types("type"); + nodeClient.admin().indices().deleteMapping(deleteMappingRequest).actionGet(); + + assertSameIndices(deleteMappingRequest, DeleteMappingAction.NAME); + } + + @Test + public void testGetSettings() { + interceptTransportActions(GetSettingsAction.NAME); + + GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases()); + nodeClient.admin().indices().getSettings(getSettingsRequest).actionGet(); + + assertSameIndices(getSettingsRequest, GetSettingsAction.NAME); + } + + @Test + public void testUpdateSettings() { + interceptTransportActions(UpdateSettingsAction.NAME); + + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(randomIndicesOrAliases()).settings(ImmutableSettings.builder().put("refresh_interval", -1)); + nodeClient.admin().indices().updateSettings(updateSettingsRequest).actionGet(); + + assertSameIndices(updateSettingsRequest, UpdateSettingsAction.NAME); + } + + @Test + public void testSearchQueryThenFetch() throws Exception { + interceptTransportActions(SearchServiceTransportAction.QUERY_ACTION_NAME, + SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.QUERY_THEN_FETCH); + SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), greaterThan(0l)); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_ACTION_NAME, SearchServiceTransportAction.FETCH_ID_ACTION_NAME); + //free context messages are not necessarily sent, but if they are, check their indices + assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + } + + @Test + public void testSearchDfsQueryThenFetch() throws Exception { + interceptTransportActions(SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, + SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.DFS_QUERY_THEN_FETCH); + SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), greaterThan(0l)); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + assertSameIndices(searchRequest, SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, + SearchServiceTransportAction.FETCH_ID_ACTION_NAME); + //free context messages are not necessarily sent, but if they are, check their indices + assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + } + + @Test + public void testSearchQueryAndFetch() throws Exception { + interceptTransportActions(SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME, + SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.QUERY_AND_FETCH); + SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), greaterThan(0l)); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME); + //free context messages are not necessarily sent, but if they are, check their indices + assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + } + + @Test + public void testSearchDfsQueryAndFetch() throws Exception { + interceptTransportActions(SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME, + SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.DFS_QUERY_AND_FETCH); + SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), greaterThan(0l)); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME); + //free context messages are not necessarily sent, but if they are, check their indices + assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + } + + @Test + public void testSearchScan() throws Exception { + interceptTransportActions(SearchServiceTransportAction.SCAN_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.SCAN).scroll(new TimeValue(500)); + SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), greaterThan(0l)); + + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + assertSameIndices(searchRequest, SearchServiceTransportAction.SCAN_ACTION_NAME); + } + + @Test + public void testMoreLikeThis() { + interceptTransportActions(GetAction.NAME + "[s]", SearchServiceTransportAction.QUERY_ACTION_NAME, + SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + + String[] randomIndicesOrAliases = randomIndicesOrAliases(); + for (int i = 0; i < randomIndicesOrAliases.length; i++) { + client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get(); + } + refresh(); + + assertAcked(prepareCreate("test-get").addAlias(new Alias("alias-get"))); + client().prepareIndex("test-get", "type", "1").setSource("field","value").get(); + String indexGet = randomBoolean() ? "test-get" : "alias-get"; + + MoreLikeThisRequest moreLikeThisRequest = new MoreLikeThisRequest(indexGet).type("type").id("1") + .searchIndices(randomIndicesOrAliases()); + nodeClient.moreLikeThis(moreLikeThisRequest).actionGet(); + + //explicitly stop intercepting requests since free context is async hence it might keep coming + //after the checks and make assertAllRequestsHaveBeenConsumed fail + clearInterceptedActions(); + + //get might end up being executed locally, only optionally over the transport + assertSameIndicesOptionalRequests(new String[]{indexGet}, GetAction.NAME + "[s]"); + //query might end up being executed locally as well, only optionally over the transport + assertSameIndicesOptionalRequests(moreLikeThisRequest.searchIndices(), SearchServiceTransportAction.QUERY_ACTION_NAME); + //free context messages are not necessarily sent through the transport, but if they are, check their indices + assertSameIndicesOptionalRequests(moreLikeThisRequest.searchIndices(), SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + } + + private static void assertSameIndices(IndicesRequest originalRequest, String... actions) { + assertSameIndices(originalRequest, false, actions); + } + + private static void assertSameIndicesOptionalRequests(IndicesRequest originalRequest, String... actions) { + assertSameIndices(originalRequest, true, actions); + } + + private static void assertSameIndices(IndicesRequest originalRequest, boolean optional, String... actions) { + for (String action : actions) { + List requests = consumeTransportRequests(action); + if (!optional) { + assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0)); + } + for (TransportRequest internalRequest : requests) { + assertThat(internalRequest, instanceOf(IndicesRequest.class)); + assertThat(internalRequest.getClass().getName(), ((IndicesRequest)internalRequest).indices(), equalTo(originalRequest.indices())); + assertThat(((IndicesRequest)internalRequest).indicesOptions(), equalTo(originalRequest.indicesOptions())); + } + } + } + + private static void assertSameIndices(String[] indices, String... actions) { + assertSameIndices(indices, false, actions); + } + + private static void assertSameIndicesOptionalRequests(String[] indices, String... actions) { + assertSameIndices(indices, true, actions); + } + + private static void assertSameIndices(String[] indices, boolean optional, String... actions) { + for (String action : actions) { + List requests = consumeTransportRequests(action); + if (!optional) { + assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0)); + } + for (TransportRequest internalRequest : requests) { + assertThat(internalRequest, instanceOf(IndicesRequest.class)); + assertThat(internalRequest.getClass().getName(), ((IndicesRequest)internalRequest).indices(), equalTo(indices)); + } + } + } + + private static void assertIndicesSubset(List indices, String... actions) { + //indices returned by each bulk shard request need to be a subset of the original indices + for (String action : actions) { + List requests = consumeTransportRequests(action); + assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0)); + for (TransportRequest internalRequest : requests) { + assertThat(internalRequest, instanceOf(IndicesRequest.class)); + for (String index : ((IndicesRequest) internalRequest).indices()) { + assertThat(indices, hasItem(index)); + } + } + } + } + + private String randomIndexOrAlias() { + String index = randomFrom(indices); + if (randomBoolean()) { + return index + "-alias"; + } else { + return index; + } + } + + private String[] randomIndicesOrAliases() { + int count = randomIntBetween(1, indices.size() * 2); //every index has an alias + String[] indices = new String[count]; + for (int i = 0; i < count; i++) { + indices[i] = randomIndexOrAlias(); + } + return indices; + } + + private String[] randomUniqueIndicesOrAliases() { + Set uniqueIndices = new HashSet<>(); + int count = randomIntBetween(1, this.indices.size()); + while (uniqueIndices.size() < count) { + uniqueIndices.add(randomFrom(this.indices)); + } + String[] indices = new String[count]; + int i = 0; + for (String index : uniqueIndices) { + indices[i++] = randomBoolean() ? index + "-alias" : index; + } + return indices; + } + + private static void assertAllRequestsHaveBeenConsumed() { + Iterable transportServices = internalCluster().getInstances(TransportService.class); + for (TransportService transportService : transportServices) { + assertThat(((InterceptingTransportService)transportService).requests.isEmpty(), equalTo(true)); + } + } + + private static void clearInterceptedActions() { + Iterable transportServices = internalCluster().getInstances(TransportService.class); + for (TransportService transportService : transportServices) { + ((InterceptingTransportService) transportService).clearInterceptedActions(); + } + } + + private static void interceptTransportActions(String... actions) { + Iterable transportServices = internalCluster().getInstances(TransportService.class); + for (TransportService transportService : transportServices) { + ((InterceptingTransportService) transportService).interceptTransportActions(actions); + } + } + + private static List consumeTransportRequests(String action) { + List requests = new ArrayList<>(); + Iterable transportServices = internalCluster().getInstances(TransportService.class); + for (TransportService transportService : transportServices) { + List transportRequests = ((InterceptingTransportService) transportService).consumeRequests(action); + if (transportRequests != null) { + requests.addAll(transportRequests); + } + } + return requests; + } + + public static class InterceptingTransportService extends TransportService { + + private final Set actions = new CopyOnWriteArraySet<>(); + + private final ConcurrentMap> requests = new ConcurrentHashMap<>(); + + @Inject + public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) { + super(settings, transport, threadPool); + } + + List consumeRequests(String action) { + return requests.remove(action); + } + + void interceptTransportActions(String... actions) { + Collections.addAll(this.actions, actions); + } + + void clearInterceptedActions() { + actions.clear(); + } + + @Override + public void registerHandler(String action, TransportRequestHandler handler) { + super.registerHandler(action, new InterceptingRequestHandler(action, handler)); + } + + private class InterceptingRequestHandler implements TransportRequestHandler { + + private final TransportRequestHandler requestHandler; + private final String action; + + InterceptingRequestHandler(String action, TransportRequestHandler requestHandler) { + this.requestHandler = requestHandler; + this.action = action; + } + + @Override + public TransportRequest newInstance() { + return requestHandler.newInstance(); + } + + @Override + @SuppressWarnings("unchecked") + public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception { + if (actions.contains(action)) { + List transportRequests = requests.putIfAbsent(action, Lists.newArrayList(request)); + if (transportRequests != null) { + transportRequests.add(request); + } + } + requestHandler.messageReceived(request, channel); + } + + @Override + public String executor() { + return requestHandler.executor(); + } + + @Override + public boolean isForceExecution() { + return requestHandler.isForceExecution(); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/action/OriginalIndicesTests.java b/src/test/java/org/elasticsearch/action/OriginalIndicesTests.java new file mode 100644 index 00000000000..d2fe61362c6 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/OriginalIndicesTests.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; + +public class OriginalIndicesTests extends ElasticsearchTestCase { + + private static final IndicesOptions[] indicesOptionsValues = new IndicesOptions[]{ + IndicesOptions.lenientExpandOpen() , IndicesOptions.strictExpand(), IndicesOptions.strictExpandOpen(), + IndicesOptions.strictExpandOpenAndForbidClosed(), IndicesOptions.strictSingleIndexNoExpandForbidClosed()}; + + @Test + public void testOriginalIndicesSerialization() throws IOException { + int iterations = iterations(10, 30); + for (int i = 0; i < iterations; i++) { + OriginalIndices originalIndices = randomOriginalIndices(); + + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersion()); + OriginalIndices.writeOriginalIndices(originalIndices, out); + + BytesStreamInput in = new BytesStreamInput(out.bytes()); + in.setVersion(out.getVersion()); + OriginalIndices originalIndices2 = OriginalIndices.readOriginalIndices(in); + + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + assertThat(originalIndices2.indices(), equalTo(originalIndices.indices())); + assertThat(originalIndices2.indicesOptions(), equalTo(originalIndices.indicesOptions())); + } else { + assertThat(originalIndices2.indices(), nullValue()); + assertThat(originalIndices2.indicesOptions(), nullValue()); + } + } + } + + @Test + public void testOptionalOriginalIndicesSerialization() throws IOException { + int iterations = iterations(10, 30); + for (int i = 0; i < iterations; i++) { + OriginalIndices originalIndices; + boolean missing = randomBoolean(); + if (missing) { + originalIndices = randomOriginalIndices(); + } else { + originalIndices = OriginalIndices.EMPTY; + } + + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersion()); + OriginalIndices.writeOptionalOriginalIndices(originalIndices, out); + + BytesStreamInput in = new BytesStreamInput(out.bytes()); + in.setVersion(out.getVersion()); + OriginalIndices originalIndices2 = OriginalIndices.readOptionalOriginalIndices(in); + + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + assertThat(originalIndices2.indices(), equalTo(originalIndices.indices())); + assertThat(originalIndices2.indicesOptions(), equalTo(originalIndices.indicesOptions())); + } else { + assertThat(originalIndices2.indices(), nullValue()); + assertThat(originalIndices2.indicesOptions(), nullValue()); + } + } + } + + private static OriginalIndices randomOriginalIndices() { + int numIndices = randomInt(10); + String[] indices = new String[numIndices]; + for (int j = 0; j < indices.length; j++) { + indices[j] = randomAsciiOfLength(randomIntBetween(1, 10)); + } + IndicesOptions indicesOptions = randomFrom(indicesOptionsValues); + return new OriginalIndices(indices, indicesOptions); + } +} diff --git a/src/test/java/org/elasticsearch/action/get/MultiGetShardRequestTests.java b/src/test/java/org/elasticsearch/action/get/MultiGetShardRequestTests.java new file mode 100644 index 00000000000..2a7f1e83514 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/get/MultiGetShardRequestTests.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.get; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.search.fetch.source.FetchSourceContext; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class MultiGetShardRequestTests extends ElasticsearchTestCase { + + @Test + public void testSerialization() throws IOException { + MultiGetRequest multiGetRequest = new MultiGetRequest(); + if (randomBoolean()) { + multiGetRequest.preference(randomAsciiOfLength(randomIntBetween(1, 10))); + } + if (randomBoolean()) { + multiGetRequest.realtime(false); + } + if (randomBoolean()) { + multiGetRequest.refresh(true); + } + multiGetRequest.ignoreErrorsOnGeneratedFields(randomBoolean()); + + MultiGetShardRequest multiGetShardRequest = new MultiGetShardRequest(multiGetRequest, "index", 0); + int numItems = iterations(10, 30); + for (int i = 0; i < numItems; i++) { + MultiGetRequest.Item item = new MultiGetRequest.Item("alias-" + randomAsciiOfLength(randomIntBetween(1, 10)), "type", "id-" + i); + if (randomBoolean()) { + int numFields = randomIntBetween(1, 5); + String[] fields = new String[numFields]; + for (int j = 0; j < fields.length; j++) { + fields[j] = randomAsciiOfLength(randomIntBetween(1, 10)); + } + item.fields(fields); + } + if (randomBoolean()) { + item.version(randomIntBetween(1, Integer.MAX_VALUE)); + item.versionType(randomFrom(VersionType.values())); + } + if (randomBoolean()) { + item.fetchSourceContext(new FetchSourceContext(randomBoolean())); + } + multiGetShardRequest.add(0, item); + } + + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersion()); + multiGetShardRequest.writeTo(out); + + BytesStreamInput in = new BytesStreamInput(out.bytes()); + in.setVersion(out.getVersion()); + MultiGetShardRequest multiGetShardRequest2 = new MultiGetShardRequest(); + multiGetShardRequest2.readFrom(in); + + assertThat(multiGetShardRequest2.index(), equalTo(multiGetShardRequest.index())); + assertThat(multiGetShardRequest2.preference(), equalTo(multiGetShardRequest.preference())); + assertThat(multiGetShardRequest2.realtime(), equalTo(multiGetShardRequest.realtime())); + assertThat(multiGetShardRequest2.refresh(), equalTo(multiGetShardRequest.refresh())); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + assertThat(multiGetShardRequest2.ignoreErrorsOnGeneratedFields(), equalTo(multiGetShardRequest.ignoreErrorsOnGeneratedFields())); + } else { + assertThat(multiGetShardRequest2.ignoreErrorsOnGeneratedFields(), equalTo(false)); + } + assertThat(multiGetShardRequest2.items.size(), equalTo(multiGetShardRequest.items.size())); + for (int i = 0; i < multiGetShardRequest2.items.size(); i++) { + MultiGetRequest.Item item = multiGetShardRequest.items.get(i); + MultiGetRequest.Item item2 = multiGetShardRequest2.items.get(i); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + assertThat(item2.index(), equalTo(item.index())); + } else { + //before 1.4 we have only one index, the concrete one + assertThat(item2.index(), equalTo(multiGetShardRequest.index())); + } + assertThat(item2.type(), equalTo(item.type())); + assertThat(item2.id(), equalTo(item.id())); + assertThat(item2.fields(), equalTo(item.fields())); + assertThat(item2.version(), equalTo(item.version())); + assertThat(item2.versionType(), equalTo(item.versionType())); + assertThat(item2.fetchSourceContext(), equalTo(item.fetchSourceContext())); + } + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + //we don't serialize the original index before 1.4, it'll get the concrete one + assertThat(multiGetShardRequest2.indices(), equalTo(multiGetShardRequest.indices())); + assertThat(multiGetShardRequest2.indicesOptions(), equalTo(multiGetShardRequest.indicesOptions())); + } + } +} diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index ea096658bab..323e6b713e5 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -36,7 +36,7 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; import org.elasticsearch.action.explain.ExplainResponse; -import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; @@ -754,6 +754,39 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa assertThat(indicesStatsResponse.getIndices().containsKey("test"), equalTo(true)); } + @Test + public void testMultiGet() throws ExecutionException, InterruptedException { + assertAcked(prepareCreate("test").addAlias(new Alias("alias"))); + ensureGreen("test"); + + int numDocs = iterations(10, 50); + IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + Integer.toString(i)); + } + indexRandom(false, indexRequestBuilders); + + int iterations = iterations(1, numDocs); + MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); + for (int i = 0; i < iterations; i++) { + multiGetRequestBuilder.add(new MultiGetRequest.Item(indexOrAlias(), "type", Integer.toString(randomInt(numDocs - 1)))); + } + MultiGetResponse multiGetResponse = multiGetRequestBuilder.get(); + assertThat(multiGetResponse.getResponses().length, equalTo(iterations)); + for (int i = 0; i < multiGetResponse.getResponses().length; i++) { + MultiGetItemResponse multiGetItemResponse = multiGetResponse.getResponses()[i]; + assertThat(multiGetItemResponse.isFailed(), equalTo(false)); + assertThat(multiGetItemResponse.getIndex(), equalTo("test")); + assertThat(multiGetItemResponse.getType(), equalTo("type")); + assertThat(multiGetItemResponse.getId(), equalTo(multiGetRequestBuilder.request().getItems().get(i).id())); + assertThat(multiGetItemResponse.getResponse().isExists(), equalTo(true)); + assertThat(multiGetItemResponse.getResponse().getIndex(), equalTo("test")); + assertThat(multiGetItemResponse.getResponse().getType(), equalTo("type")); + assertThat(multiGetItemResponse.getResponse().getId(), equalTo(multiGetRequestBuilder.request().getItems().get(i).id())); + } + + } + private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; }