Internal: make sure that all shard level requests hold the original indices

A request that relates to indices (`IndicesRequest` or `CompositeIndicesRequest`) might be converted to some other internal request(s) (e.g. shard level request) that get distributed over the cluster. Those requests contain the concrete index they refer to, but it is not known which indices (or aliases or expressions) the original request related to.

This commit makes sure that the original indices are available as part of the shard level requests and makes them implement `IndicesRequest` as well.

Also every internal request should be created passing in the original request, so that the original headers, together with the eventual original indices and options, get copied to it. Corrected some places where this information was lost.

NOTE: As for the bulk api and other multi items api (e.g. multi_get), their shard level requests won't keep around the whole set of original indices, but only the ones that related to the bulk items sent to each shard, the important bit is that we keep the original names though, not only the concrete ones.

Closes #7319
This commit is contained in:
javanna 2014-08-13 16:22:22 +02:00 committed by Luca Cavanna
parent 0234b5b9b4
commit 441c1c8268
44 changed files with 1731 additions and 172 deletions

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.support.IndicesOptions;
/**
* Needs to be implemented by all {@link org.elasticsearch.action.ActionRequest} subclasses that relate to
* one or more indices. Allows to retrieve which indices the action relates to.
* In case of internal requests originated during the distributed execution of an external request,
* they will still return the indices that the original request related to.
*/
public interface IndicesRequest {

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Used to keep track of original indices within internal (e.g. shard level) requests
*/
public class OriginalIndices implements IndicesRequest {
public static OriginalIndices EMPTY = new OriginalIndices();
private final String[] indices;
private final IndicesOptions indicesOptions;
private OriginalIndices() {
this.indices = null;
this.indicesOptions = null;
}
public OriginalIndices(IndicesRequest indicesRequest) {
this.indices = indicesRequest.indices();
this.indicesOptions = indicesRequest.indicesOptions();
}
public OriginalIndices(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices;
this.indicesOptions = indicesOptions;
}
@Override
public String[] indices() {
return indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
public static OriginalIndices readOptionalOriginalIndices(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
boolean empty = in.readBoolean();
if (!empty) {
return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in));
}
}
return OriginalIndices.EMPTY;
}
public static void writeOptionalOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
boolean empty = originalIndices == EMPTY;
out.writeBoolean(empty);
if (!empty) {
out.writeStringArrayNullable(originalIndices.indices);
originalIndices.indicesOptions.writeIndicesOptions(out);
}
}
}
public static OriginalIndices readOriginalIndices(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in));
}
return OriginalIndices.EMPTY;
}
public static void writeOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeStringArrayNullable(originalIndices.indices);
originalIndices.indicesOptions.writeIndicesOptions(out);
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.custom.SingleCustomOperationRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -33,10 +35,13 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
private String[] fields = Strings.EMPTY_ARRAY;
private String[] types = Strings.EMPTY_ARRAY;
private OriginalIndices originalIndices;
GetFieldMappingsIndexRequest() {
}
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
super(other);
this.preferLocal(other.local);
this.probablySingleFieldRequest = probablySingleFieldRequest;
this.includeDefaults = other.includeDefaults();
@ -44,6 +49,7 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
this.fields = other.fields();
assert index != null;
this.index(index);
this.originalIndices = new OriginalIndices(other);
}
public String[] types() {
@ -62,6 +68,16 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
return includeDefaults;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -69,6 +85,7 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
out.writeStringArray(fields);
out.writeBoolean(includeDefaults);
out.writeBoolean(probablySingleFieldRequest);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
@ -83,6 +100,7 @@ class GetFieldMappingsIndexRequest extends SingleCustomOperationRequest<GetField
fields = in.readStringArray();
includeDefaults = in.readBoolean();
probablySingleFieldRequest = in.readBoolean();
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override

View File

@ -134,7 +134,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments());
}
static class IndexShardSegmentRequest extends BroadcastShardOperationRequest {
public static class IndexShardSegmentRequest extends BroadcastShardOperationRequest {
IndexShardSegmentRequest() {
}

View File

@ -200,7 +200,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
return new ShardStats(indexShard, flags);
}
static class IndexShardStatsRequest extends BroadcastShardOperationRequest {
public static class IndexShardStatsRequest extends BroadcastShardOperationRequest {
// TODO if there are many indices, the request might hold a large indices array..., we don't really need to serialize it
IndicesStatsRequest request;

View File

@ -179,7 +179,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
String error = null;
DefaultSearchContext searchContext = new DefaultSearchContext(0,
new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis())
new ShardSearchRequest(request).types(request.types()).nowInMillis(request.nowInMillis())
.filteringAliases(request.filteringAliases()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -43,6 +44,7 @@ public class BulkItemRequest implements Streamable {
}
public BulkItemRequest(int id, ActionRequest request) {
assert request instanceof IndicesRequest;
this.id = id;
this.request = request;
}
@ -55,6 +57,12 @@ public class BulkItemRequest implements Streamable {
return request;
}
public String index() {
IndicesRequest indicesRequest = (IndicesRequest) request;
assert indicesRequest.indices().length == 1;
return indicesRequest.indices()[0];
}
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
BulkItemRequest item = new BulkItemRequest();
item.readFrom(in);

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
@ -40,7 +42,8 @@ public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShard
BulkShardRequest() {
}
BulkShardRequest(String index, int shardId, boolean refresh, BulkItemRequest[] items) {
BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) {
super(bulkRequest);
this.index = index;
this.shardId = shardId;
this.items = items;
@ -59,6 +62,17 @@ public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShard
return items;
}
@Override
public String[] indices() {
List<String> indices = new ArrayList<>();
for (BulkItemRequest item : items) {
if (item != null) {
indices.add(item.index());
}
}
return indices.toArray(new String[indices.size()]);
}
/**
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
*/

View File

@ -296,7 +296,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.replicationType(bulkRequest.replicationType());
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());

View File

@ -170,7 +170,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchRequest().types(request.types())
new ShardSearchRequest(request).types(request.types())
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,

View File

@ -31,13 +31,16 @@ class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteReq
private final String id;
private final boolean refresh;
private final long version;
private final String originalIndex;
IndexDeleteRequest(DeleteRequest request, String concreteIndex) {
super(concreteIndex, request.timeout(), request.replicationType(), request.consistencyLevel());
super(concreteIndex, request.timeout(), request.replicationType(), request.consistencyLevel(),
request.indices(), request.indicesOptions(), request);
this.type = request.type();
this.id = request.id();
this.refresh = request.refresh();
this.version = request.version();
this.originalIndex = request.index();
}
String type() {
@ -55,4 +58,8 @@ class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteReq
long version() {
return this.version;
}
String originalIndex() {
return originalIndex;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
@ -39,6 +40,7 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
private String id;
private boolean refresh = false;
private long version;
private String originalIndex;
ShardDeleteRequest(IndexDeleteRequest request, int shardId) {
super(request);
@ -51,6 +53,7 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
timeout = request.timeout();
this.refresh = request.refresh();
this.version = request.version();
this.originalIndex = request.originalIndex();
}
ShardDeleteRequest() {
@ -92,6 +95,11 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
return this.version;
}
@Override
public String[] indices() {
return new String[]{originalIndex};
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -100,6 +108,9 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
id = in.readString();
refresh = in.readBoolean();
version = Versions.readVersion(in);
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
originalIndex = in.readString();
}
}
@Override
@ -110,5 +121,8 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
out.writeString(id);
out.writeBoolean(refresh);
Versions.writeVersion(version, out);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeString(originalIndex);
}
}
}

View File

@ -40,7 +40,7 @@ class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<IndexDe
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
long nowInMillis) {
super(index, request.timeout(), request.replicationType(), request.consistencyLevel());
super(index, request.timeout(), request.replicationType(), request.consistencyLevel(), request.indices(), request.indicesOptions(), request);
this.source = request.source();
this.types = request.types();
this.routing = routing;

View File

@ -21,6 +21,8 @@ package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -50,6 +52,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
private String[] filteringAliases;
private long nowInMillis;
private OriginalIndices originalIndices;
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
super(request);
this.index = request.index();
@ -62,6 +66,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
this.routing = request.routing();
filteringAliases = request.filteringAliases();
nowInMillis = request.nowInMillis();
this.originalIndices = new OriginalIndices(request);
}
ShardDeleteByQueryRequest() {
@ -100,6 +105,16 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
return nowInMillis;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -126,6 +141,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
} else {
nowInMillis = System.currentTimeMillis();
}
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
@ -153,6 +169,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override

View File

@ -108,7 +108,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest.request).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays));
try {
@ -130,7 +130,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API, IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays));
try {

View File

@ -171,7 +171,7 @@ public class TransportExistsAction extends TransportBroadcastOperationAction<Exi
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchRequest().types(request.types())
new ShardSearchRequest(request).types(request.types())
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,

View File

@ -118,7 +118,7 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
SearchContext context = new DefaultSearchContext(
0,
new ShardSearchRequest().types(new String[]{request.type()})
new ShardSearchRequest(request).types(new String[]{request.type()})
.filteringAliases(request.filteringAlias())
.nowInMillis(request.nowInMillis),
null, result.searcher(), indexService, indexShard,

View File

@ -96,6 +96,14 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
this.type = "_all";
}
/**
* Constructs a new get request starting from the provided request, meaning that it will
* inherit its headers and context, and against the specified index.
*/
public GetRequest(ActionRequest request, String index) {
super(request, index);
}
/**
* Constructs a new get request against the specified index with the type and id.
*

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.LongArrayList;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
@ -43,27 +42,21 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
boolean ignoreErrorsOnGeneratedFields = false;
IntArrayList locations;
List<String> types;
List<String> ids;
List<String[]> fields;
LongArrayList versions;
List<VersionType> versionTypes;
List<FetchSourceContext> fetchSourceContexts;
List<MultiGetRequest.Item> items;
MultiGetShardRequest() {
}
MultiGetShardRequest(String index, int shardId) {
super(index);
MultiGetShardRequest(MultiGetRequest multiGetRequest, String index, int shardId) {
super(multiGetRequest, index);
this.shardId = shardId;
locations = new IntArrayList();
types = new ArrayList<>();
ids = new ArrayList<>();
fields = new ArrayList<>();
versions = new LongArrayList();
versionTypes = new ArrayList<>();
fetchSourceContexts = new ArrayList<>();
items = new ArrayList<>();
preference = multiGetRequest.preference;
realtime = multiGetRequest.realtime;
refresh = multiGetRequest.refresh;
ignoreErrorsOnGeneratedFields = multiGetRequest.ignoreErrorsOnGeneratedFields;
}
public int shardId() {
@ -107,14 +100,18 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
return this;
}
public void add(int location, @Nullable String type, String id, String[] fields, long version, VersionType versionType, FetchSourceContext fetchSourceContext) {
void add(int location, MultiGetRequest.Item item) {
this.locations.add(location);
this.types.add(type);
this.ids.add(id);
this.fields.add(fields);
this.versions.add(version);
this.versionTypes.add(versionType);
this.fetchSourceContexts.add(fetchSourceContext);
this.items.add(item);
}
@Override
public String[] indices() {
String[] indices = new String[items.size()];
for (int i = 0; i < indices.length; i++) {
indices[i] = items.get(i).index();
}
return indices;
}
@Override
@ -122,34 +119,52 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
super.readFrom(in);
int size = in.readVInt();
locations = new IntArrayList(size);
types = new ArrayList<>(size);
ids = new ArrayList<>(size);
fields = new ArrayList<>(size);
versions = new LongArrayList(size);
versionTypes = new ArrayList<>(size);
fetchSourceContexts = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
types.add(in.readSharedString());
} else {
types.add(null);
}
ids.add(in.readString());
int size1 = in.readVInt();
if (size1 > 0) {
String[] fields = new String[size1];
for (int j = 0; j < size1; j++) {
fields[j] = in.readString();
}
this.fields.add(fields);
} else {
fields.add(null);
}
versions.add(Versions.readVersionWithVLongForBW(in));
versionTypes.add(VersionType.fromValue(in.readByte()));
items = new ArrayList<>(size);
fetchSourceContexts.add(FetchSourceContext.optionalReadFromStream(in));
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
items.add(MultiGetRequest.Item.readItem(in));
}
} else {
List<String> types = new ArrayList<>(size);
List<String> ids = new ArrayList<>(size);
List<String[]> fields = new ArrayList<>(size);
LongArrayList versions = new LongArrayList(size);
List<VersionType> versionTypes = new ArrayList<>(size);
List<FetchSourceContext> fetchSourceContexts = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
types.add(in.readSharedString());
} else {
types.add(null);
}
ids.add(in.readString());
int size1 = in.readVInt();
if (size1 > 0) {
String[] fieldsArray = new String[size1];
for (int j = 0; j < size1; j++) {
fieldsArray[j] = in.readString();
}
fields.add(fieldsArray);
} else {
fields.add(null);
}
versions.add(Versions.readVersionWithVLongForBW(in));
versionTypes.add(VersionType.fromValue(in.readByte()));
fetchSourceContexts.add(FetchSourceContext.optionalReadFromStream(in));
}
for (int i = 0; i < size; i++) {
//before 1.4 we have only one index, the concrete one
MultiGetRequest.Item item = new MultiGetRequest.Item(index, types.get(i), ids.get(i))
.fields(fields.get(i)).version(versions.get(i)).versionType(versionTypes.get(i))
.fetchSourceContext(fetchSourceContexts.get(i));
items.add(item);
}
}
preference = in.readOptionalString();
@ -168,35 +183,43 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(types.size());
for (int i = 0; i < types.size(); i++) {
out.writeVInt(locations.get(i));
if (types.get(i) == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeSharedString(types.get(i));
out.writeVInt(locations.size());
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
for (int i = 0; i < locations.size(); i++) {
out.writeVInt(locations.get(i));
items.get(i).writeTo(out);
}
out.writeString(ids.get(i));
if (fields.get(i) == null) {
out.writeVInt(0);
} else {
out.writeVInt(fields.get(i).length);
for (String field : fields.get(i)) {
out.writeString(field);
} else {
for (int i = 0; i < locations.size(); i++) {
out.writeVInt(locations.get(i));
MultiGetRequest.Item item = items.get(i);
if (item.type() == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeSharedString(item.type());
}
out.writeString(item.id());
if (item.fields() == null) {
out.writeVInt(0);
} else {
out.writeVInt(item.fields().length);
for (String field : item.fields()) {
out.writeString(field);
}
}
Versions.writeVersionWithVLongForBW(item.version(), out);
out.writeByte(item.versionType().getValue());
FetchSourceContext.optionalWriteToStream(item.fetchSourceContext(), out);
}
Versions.writeVersionWithVLongForBW(versions.get(i), out);
out.writeByte(versionTypes.get(i).getValue());
FetchSourceContext fetchSourceContext = fetchSourceContexts.get(i);
FetchSourceContext.optionalWriteToStream(fetchSourceContext, out);
}
out.writeOptionalString(preference);
out.writeBoolean(refresh);
if (realtime == null) {
out.writeByte((byte) -1);
} else if (realtime == false) {
} else if (!realtime) {
out.writeByte((byte) 0);
} else {
out.writeByte((byte) 1);

View File

@ -81,15 +81,10 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
.getShards(clusterState, concreteSingleIndex, item.type(), item.id(), item.routing(), null).shardId();
MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiGetShardRequest(shardId.index().name(), shardId.id());
shardRequest.preference(request.preference);
shardRequest.realtime(request.realtime);
shardRequest.refresh(request.refresh);
shardRequest.ignoreErrorsOnGeneratedFields(request.ignoreErrorsOnGeneratedFields);
shardRequest = new MultiGetShardRequest(request, shardId.index().name(), shardId.id());
shardRequests.put(shardId, shardRequest);
}
shardRequest.add(i, item.type(), item.id(), item.fields(), item.version(), item.versionType(), item.fetchSourceContext());
shardRequest.add(i, item);
}
if (shardRequests.size() == 0) {
@ -116,8 +111,9 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
// create failures for all relevant requests
String message = ExceptionsHelper.detailedMessage(e);
for (int i = 0; i < shardRequest.locations.size(); i++) {
MultiGetRequest.Item item = shardRequest.items.get(i);
responses.set(shardRequest.locations.get(i), new MultiGetItemResponse(null,
new MultiGetResponse.Failure(shardRequest.index(), shardRequest.types.get(i), shardRequest.ids.get(i), message)));
new MultiGetResponse.Failure(shardRequest.index(), item.type(), item.id(), message)));
}
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -29,14 +29,12 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -106,26 +104,16 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
MultiGetShardResponse response = new MultiGetShardResponse();
for (int i = 0; i < request.locations.size(); i++) {
String type = request.types.get(i);
String id = request.ids.get(i);
String[] fields = request.fields.get(i);
long version = request.versions.get(i);
VersionType versionType = request.versionTypes.get(i);
if (versionType == null) {
versionType = VersionType.INTERNAL;
}
FetchSourceContext fetchSourceContext = request.fetchSourceContexts.get(i);
MultiGetRequest.Item item = request.items.get(i);
try {
GetResult getResult = indexShard.getService().get(type, id, fields, request.realtime(), version, versionType, fetchSourceContext, request.ignoreErrorsOnGeneratedFields());
GetResult getResult = indexShard.getService().get(item.type(), item.id(), item.fields(), request.realtime(), item.version(), item.versionType(), item.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (Throwable t) {
if (TransportActions.isShardNotAvailableException(t)) {
throw (ElasticsearchException) t;
} else {
logger.debug("[{}][{}] failed to execute multi_get for [{}]/[{}]", t, request.index(), shardId, type, id);
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), type, id, ExceptionsHelper.detailedMessage(t)));
logger.debug("{} failed to execute multi_get for [{}]/[{}]", t, shardId, item.type(), item.id());
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.type(), item.id(), ExceptionsHelper.detailedMessage(t)));
}
}
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -51,15 +50,15 @@ import org.elasticsearch.index.query.MoreLikeThisFieldQueryBuilder;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.client.Requests.getRequest;
import static org.elasticsearch.client.Requests.searchRequest;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -124,7 +123,7 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
// add the source, in case we need to parse it to get fields
getFields.add(SourceFieldMapper.NAME);
GetRequest getRequest = getRequest(concreteIndex)
GetRequest getRequest = new GetRequest(request, request.index())
.fields(getFields.toArray(new String[getFields.size()]))
.type(request.type())
.id(request.id())
@ -204,7 +203,8 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
if (searchTypes == null) {
searchTypes = new String[]{request.type()};
}
SearchRequest searchRequest = searchRequest(searchIndices)
SearchRequest searchRequest = new SearchRequest(request).indices(searchIndices)
.types(searchTypes)
.searchType(request.searchType())
.scroll(request.searchScroll())

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.percolate;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -41,10 +42,6 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
PercolateShardRequest() {
}
PercolateShardRequest(ShardId shardId) {
super(shardId);
}
PercolateShardRequest(ShardId shardId, int numberOfShards, PercolateRequest request) {
super(shardId, request);
this.documentType = request.documentType();
@ -54,6 +51,10 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
this.numberOfShards = numberOfShards;
}
PercolateShardRequest(ShardId shardId, OriginalIndices originalIndices) {
super(shardId, originalIndices);
}
PercolateShardRequest(ShardId shardId, PercolateRequest request) {
super(shardId, request);
this.documentType = request.documentType();
@ -98,6 +99,10 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
return numberOfShards;
}
OriginalIndices originalIndices() {
return originalIndices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest;
@ -43,6 +44,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -121,10 +123,10 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
private String preference;
private List<Item> items;
public Request() {
Request() {
}
public Request(String concreteIndex, int shardId, String preference) {
Request(String concreteIndex, int shardId, String preference) {
this.index = concreteIndex;
this.shardId = shardId;
this.preference = preference;
@ -133,7 +135,11 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
@Override
public String[] indices() {
return new String[]{index};
List<String> indices = new ArrayList<>();
for (Item item : items) {
Collections.addAll(indices, item.request.indices());
}
return indices.toArray(new String[indices.size()]);
}
public int shardId() {
@ -157,7 +163,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
int slot = in.readVInt();
PercolateShardRequest shardRequest = new PercolateShardRequest(new ShardId(index, shardId));
OriginalIndices originalIndices = OriginalIndices.readOriginalIndices(in);
PercolateShardRequest shardRequest = new PercolateShardRequest(new ShardId(index, shardId), originalIndices);
shardRequest.documentType(in.readString());
shardRequest.source(in.readBytesReference());
shardRequest.docSource(in.readBytesReference());
@ -175,6 +182,7 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
out.writeVInt(items.size());
for (Item item : items) {
out.writeVInt(item.slot);
OriginalIndices.writeOriginalIndices(item.request.originalIndices(), out);
out.writeString(item.request.documentType());
out.writeBytesReference(item.request.source());
out.writeBytesReference(item.request.docSource());
@ -182,7 +190,7 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
}
}
public static class Item {
static class Item {
private final int slot;
private final PercolateShardRequest request;

View File

@ -94,6 +94,14 @@ public class SearchRequest extends ActionRequest<SearchRequest> implements Indic
public SearchRequest() {
}
/**
* Constructs a new search request starting from the provided request, meaning that it will
* inherit its headers and context
*/
public SearchRequest(ActionRequest request) {
super(request);
}
/**
* Constructs a new search request against the indices. No indices provided here means that search
* will run against all indices.

View File

@ -39,7 +39,6 @@ import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -35,41 +36,47 @@ public abstract class BroadcastShardOperationRequest extends TransportRequest im
private ShardId shardId;
protected OriginalIndices originalIndices;
protected BroadcastShardOperationRequest() {
}
protected BroadcastShardOperationRequest(ShardId shardId, BroadcastOperationRequest request) {
super(request);
this.shardId = shardId;
this.originalIndices = new OriginalIndices(request);
}
protected BroadcastShardOperationRequest(ShardId shardId) {
protected BroadcastShardOperationRequest(ShardId shardId, OriginalIndices originalIndices) {
this.shardId = shardId;
}
@Override
public String[] indices() {
return new String[]{shardId.getIndex()};
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
this.originalIndices = originalIndices;
}
public ShardId shardId() {
return this.shardId;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
}

View File

@ -19,10 +19,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -41,12 +38,16 @@ public abstract class IndexReplicationOperationRequest<T extends IndexReplicatio
private final String index;
private final ReplicationType replicationType;
private final WriteConsistencyLevel consistencyLevel;
private final OriginalIndices originalIndices;
protected IndexReplicationOperationRequest(String index, TimeValue timeout, ReplicationType replicationType, WriteConsistencyLevel consistencyLevel) {
protected IndexReplicationOperationRequest(String index, TimeValue timeout, ReplicationType replicationType, WriteConsistencyLevel consistencyLevel,
String[] originalIndices, IndicesOptions originalIndicesOptions, ActionRequest request) {
super(request);
this.index = index;
this.timeout = timeout;
this.replicationType = replicationType;
this.consistencyLevel = consistencyLevel;
this.originalIndices = new OriginalIndices(originalIndices, originalIndicesOptions);
}
@Override
@ -64,12 +65,12 @@ public abstract class IndexReplicationOperationRequest<T extends IndexReplicatio
@Override
public String[] indices() {
return new String[]{index};
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
return originalIndices.indicesOptions();
}
public ReplicationType replicationType() {

View File

@ -54,11 +54,11 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
}
public ShardReplicationOperationRequest(ActionRequest request) {
protected ShardReplicationOperationRequest(ActionRequest request) {
super(request);
}
public ShardReplicationOperationRequest(T request) {
protected ShardReplicationOperationRequest(T request) {
super(request);
this.timeout = request.timeout();
this.index = request.index();

View File

@ -23,11 +23,9 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
@ -86,7 +84,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.transportReplicaAction = transportReplicaAction();
this.transportReplicaAction = actionName + "[r]";
this.executor = executor();
this.checkWriteConsistency = checkWriteConsistency();
@ -157,10 +155,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return false;
}
private String transportReplicaAction() {
return actionName + "[r]";
}
protected boolean retryPrimaryException(Throwable e) {
return TransportActions.isShardNotAvailableException(e);
}
@ -261,20 +255,30 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected class ReplicaOperationRequest extends TransportRequest {
protected class ReplicaOperationRequest extends TransportRequest implements IndicesRequest {
public ShardId shardId;
public ReplicaRequest request;
public ReplicaOperationRequest() {
ReplicaOperationRequest() {
}
public ReplicaOperationRequest(ShardId shardId, ReplicaRequest request) {
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request) {
super(request);
this.shardId = shardId;
this.request = request;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -40,6 +40,10 @@ public abstract class SingleCustomOperationRequest<T extends SingleCustomOperati
protected SingleCustomOperationRequest() {
}
protected SingleCustomOperationRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -22,8 +22,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -307,7 +309,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
}
protected class ShardSingleOperationRequest extends TransportRequest {
protected class ShardSingleOperationRequest extends TransportRequest implements IndicesRequest {
private Request request;
private ShardId shardId;
@ -325,6 +327,16 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
return request;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
public ShardId shardId() {
return shardId;
}

View File

@ -43,11 +43,16 @@ public abstract class SingleShardOperationRequest<T extends SingleShardOperation
protected SingleShardOperationRequest() {
}
protected SingleShardOperationRequest(String index) {
this.index = index;
}
protected SingleShardOperationRequest(ActionRequest request) {
super(request);
}
public SingleShardOperationRequest(String index) {
protected SingleShardOperationRequest(ActionRequest request, String index) {
super(request);
this.index = index;
}

View File

@ -23,8 +23,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
@ -298,7 +300,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
}
class ShardSingleOperationRequest extends TransportRequest {
class ShardSingleOperationRequest extends TransportRequest implements IndicesRequest {
private Request request;
@ -307,7 +309,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
ShardSingleOperationRequest() {
}
public ShardSingleOperationRequest(Request request, ShardId shardId) {
ShardSingleOperationRequest(Request request, ShardId shardId) {
super(request);
this.request = request;
this.shardId = shardId;
@ -321,6 +323,16 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
return shardId;
}
@Override
public String[] indices() {
return request.indices();
}
@Override
public IndicesOptions indicesOptions() {
return request.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -40,8 +40,8 @@ public class MultiTermVectorsShardRequest extends SingleShardOperationRequest<Mu
}
MultiTermVectorsShardRequest(String index, int shardId) {
super(index);
MultiTermVectorsShardRequest(MultiTermVectorsRequest request, String index, int shardId) {
super(request, index);
this.shardId = shardId;
locations = new IntArrayList();
requests = new ArrayList<>();
@ -71,6 +71,15 @@ public class MultiTermVectorsShardRequest extends SingleShardOperationRequest<Mu
this.requests.add(request);
}
@Override
public String[] indices() {
String[] indices = new String[requests.size()];
for (int i = 0; i < indices.length; i++) {
indices[i] = requests.get(i).index();
}
return indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -78,7 +78,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
termVectorRequest.type(), termVectorRequest.id(), termVectorRequest.routing(), null).shardId();
MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiTermVectorsShardRequest(shardId.index().name(), shardId.id());
shardRequest = new MultiTermVectorsShardRequest(request, shardId.index().name(), shardId.id());
shardRequest.preference(request.preference);
shardRequests.put(shardId, shardRequest);

View File

@ -21,8 +21,11 @@ package org.elasticsearch.search.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -547,32 +550,52 @@ public class SearchServiceTransportAction extends AbstractComponent {
}
}
static class SearchFreeContextRequest extends TransportRequest {
static class SearchFreeContextRequest extends TransportRequest implements IndicesRequest {
private long id;
private OriginalIndices originalIndices;
SearchFreeContextRequest() {
}
SearchFreeContextRequest(SearchRequest request, long id) {
super(request);
this.id = id;
this.originalIndices = new OriginalIndices(request);
}
SearchFreeContextRequest(TransportRequest request, long id) {
super(request);
this.id = id;
this.originalIndices = OriginalIndices.EMPTY;
}
public long id() {
return this.id;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
}
}

View File

@ -23,7 +23,12 @@ import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
@ -34,7 +39,7 @@ import java.io.IOException;
/**
*
*/
public class FetchSearchRequest extends TransportRequest {
public class FetchSearchRequest extends TransportRequest implements IndicesRequest {
private long id;
@ -44,19 +49,31 @@ public class FetchSearchRequest extends TransportRequest {
private ScoreDoc lastEmittedDoc;
private OriginalIndices originalIndices;
public FetchSearchRequest() {
}
public FetchSearchRequest(TransportRequest request, long id, IntArrayList list) {
public FetchSearchRequest(SearchRequest request, long id, IntArrayList list) {
this(request, id, list, null);
}
public FetchSearchRequest(TransportRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
public FetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(request);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
this.originalIndices = new OriginalIndices(request);
}
public FetchSearchRequest(SearchScrollRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(request);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
this.originalIndices = OriginalIndices.EMPTY;
}
public long id() {
@ -75,6 +92,16 @@ public class FetchSearchRequest extends TransportRequest {
return lastEmittedDoc;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -94,6 +121,7 @@ public class FetchSearchRequest extends TransportRequest {
throw new IOException("Unknown flag: " + flag);
}
}
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
}
@Override
@ -115,5 +143,6 @@ public class FetchSearchRequest extends TransportRequest {
Lucene.writeScoreDoc(out, lastEmittedDoc);
}
}
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.internal;
import org.elasticsearch.Version;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.type.ParsedScrollId;
@ -84,7 +85,14 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
private boolean useSlowScroll;
private OriginalIndices originalIndices;
public ShardSearchRequest() {
}
public ShardSearchRequest(TransportRequest request) {
super(request);
}
public ShardSearchRequest(SearchRequest searchRequest, ShardRouting shardRouting, int numberOfShards, boolean useSlowScroll) {
@ -103,10 +111,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
this.types = searchRequest.types();
this.useSlowScroll = useSlowScroll;
this.queryCache = searchRequest.queryCache();
}
public ShardSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchType searchType) {
this(shardRouting.index(), shardRouting.id(), numberOfShards, searchType);
this.originalIndices = new OriginalIndices(searchRequest);
}
public ShardSearchRequest(String index, int shardId, int numberOfShards, SearchType searchType) {
@ -114,6 +119,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
this.shardId = shardId;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
this.originalIndices = OriginalIndices.EMPTY;
}
public String index() {
@ -122,12 +128,12 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
@Override
public String[] indices() {
return new String[]{index};
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
return originalIndices.indicesOptions();
}
public int shardId() {
@ -270,6 +276,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readOptionalBoolean();
}
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
}
@Override
@ -318,5 +325,6 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalBoolean(queryCache);
}
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
}
}

View File

@ -19,7 +19,10 @@
package org.elasticsearch.search.query;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.dfs.AggregatedDfs;
@ -32,12 +35,14 @@ import static org.elasticsearch.search.dfs.AggregatedDfs.readAggregatedDfs;
/**
*
*/
public class QuerySearchRequest extends TransportRequest {
public class QuerySearchRequest extends TransportRequest implements IndicesRequest {
private long id;
private AggregatedDfs dfs;
private OriginalIndices originalIndices;
public QuerySearchRequest() {
}
@ -45,6 +50,7 @@ public class QuerySearchRequest extends TransportRequest {
super(request);
this.id = id;
this.dfs = dfs;
this.originalIndices = new OriginalIndices(request);
}
public long id() {
@ -55,11 +61,22 @@ public class QuerySearchRequest extends TransportRequest {
return dfs;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
dfs = readAggregatedDfs(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
@ -67,5 +84,6 @@ public class QuerySearchRequest extends TransportRequest {
super.writeTo(out);
out.writeLong(id);
dfs.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
}

View File

@ -0,0 +1,964 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import com.google.common.collect.Lists;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeAction;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingAction;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeAction;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.count.CountAction;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.exists.ExistsAction;
import org.elasticsearch.action.exists.ExistsRequest;
import org.elasticsearch.action.explain.ExplainAction;
import org.elasticsearch.action.explain.ExplainRequest;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.percolate.MultiPercolateAction;
import org.elasticsearch.action.percolate.MultiPercolateRequest;
import org.elasticsearch.action.percolate.PercolateAction;
import org.elasticsearch.action.percolate.PercolateRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.suggest.SuggestAction;
import org.elasticsearch.action.suggest.SuggestRequest;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.termvector.MultiTermVectorsAction;
import org.elasticsearch.action.termvector.MultiTermVectorsRequest;
import org.elasticsearch.action.termvector.TermVectorAction;
import org.elasticsearch.action.termvector.TermVectorRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.SUITE)
public class IndicesRequestTests extends ElasticsearchIntegrationTest {
private final List<String> indices = new ArrayList<>();
private Client nodeClient;
@Override
protected int minimumNumberOfShards() {
//makes sure that a reduce is always needed when searching
return 2;
}
@Override
protected int minimumNumberOfReplicas() {
//makes sure that write operations get sent to the replica as well
//so we are able to intercept those messages and check them
return 1;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.settingsBuilder()
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InterceptingTransportService.class.getName())
.put(super.nodeSettings(nodeOrdinal)).build();
}
@Before
public void setup() {
//make sure there is a node client around before each test starts
nodeClient = internalCluster().clientNodeClient();
int numIndices = iterations(1, 5);
for (int i = 0; i < numIndices; i++) {
indices.add("test" + i);
}
for (String index : indices) {
assertAcked(prepareCreate(index).addAlias(new Alias(index + "-alias")));
}
ensureGreen();
}
@After
public void cleanUp() {
assertAllRequestsHaveBeenConsumed();
clearInterceptedActions();
indices.clear();
}
@Test
public void testGetFieldMappings() {
String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
interceptTransportActions(getFieldMappingsShardAction);
GetFieldMappingsRequest getFieldMappingsRequest = new GetFieldMappingsRequest();
getFieldMappingsRequest.indices(randomIndicesOrAliases());
nodeClient.admin().indices().getFieldMappings(getFieldMappingsRequest).actionGet();
assertSameIndices(getFieldMappingsRequest, getFieldMappingsShardAction);
}
@Test
public void testAnalyze() {
String analyzeShardAction = AnalyzeAction.NAME + "[s]";
interceptTransportActions(analyzeShardAction);
AnalyzeRequest analyzeRequest = new AnalyzeRequest(randomIndexOrAlias(), "text");
nodeClient.admin().indices().analyze(analyzeRequest).actionGet();
assertSameIndices(analyzeRequest, analyzeShardAction);
}
@Test
public void testIndex() {
String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[r]"};
interceptTransportActions(indexShardActions);
IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value");
nodeClient.index(indexRequest).actionGet();
assertSameIndices(indexRequest, indexShardActions);
}
@Test
public void testDelete() {
String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[r]"};
interceptTransportActions(deleteShardActions);
DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id");
nodeClient.delete(deleteRequest).actionGet();
assertSameIndices(deleteRequest, deleteShardActions);
}
@Test
public void testUpdate() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME, IndexAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
client().prepareIndex(indexOrAlias, "type", "id").setSource("field", "value").get();
UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").doc("field1", "value1");
UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet();
assertThat(updateResponse.isCreated(), equalTo(false));
assertSameIndices(updateRequest, updateShardActions);
}
@Test
public void testUpdateUpsert() {
//update action goes to the primary, index op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME, IndexAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").upsert("field", "value").doc("field1", "value1");
UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet();
assertThat(updateResponse.isCreated(), equalTo(true));
assertSameIndices(updateRequest, updateShardActions);
}
@Test
public void testUpdateDelete() {
//update action goes to the primary, delete op gets executed locally, then replicated
String[] updateShardActions = new String[]{UpdateAction.NAME, DeleteAction.NAME + "[r]"};
interceptTransportActions(updateShardActions);
String indexOrAlias = randomIndexOrAlias();
client().prepareIndex(indexOrAlias, "type", "id").setSource("field", "value").get();
UpdateRequest updateRequest = new UpdateRequest(indexOrAlias, "type", "id").script("ctx.op='delete'");
UpdateResponse updateResponse = nodeClient.update(updateRequest).actionGet();
assertThat(updateResponse.isCreated(), equalTo(false));
assertSameIndices(updateRequest, updateShardActions);
}
@Test
public void testDeleteByQuery() {
String[] deleteByQueryShardActions = new String[]{DeleteByQueryAction.NAME + "[s]", DeleteByQueryAction.NAME + "[s][r]"};
interceptTransportActions(deleteByQueryShardActions);
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(randomIndicesOrAliases()).source(new QuerySourceBuilder().setQuery(QueryBuilders.matchAllQuery()));
nodeClient.deleteByQuery(deleteByQueryRequest).actionGet();
assertSameIndices(deleteByQueryRequest, deleteByQueryShardActions);
}
@Test
public void testBulk() {
String[] bulkShardActions = new String[]{BulkAction.NAME + "[s]", BulkAction.NAME + "[s][r]"};
interceptTransportActions(bulkShardActions);
List<String> indices = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
int numIndexRequests = iterations(1, 10);
for (int i = 0; i < numIndexRequests; i++) {
String indexOrAlias = randomIndexOrAlias();
bulkRequest.add(new IndexRequest(indexOrAlias, "type", "id").source("field", "value"));
indices.add(indexOrAlias);
}
int numDeleteRequests = iterations(1, 10);
for (int i = 0; i < numDeleteRequests; i++) {
String indexOrAlias = randomIndexOrAlias();
bulkRequest.add(new DeleteRequest(indexOrAlias, "type", "id"));
indices.add(indexOrAlias);
}
int numUpdateRequests = iterations(1, 10);
for (int i = 0; i < numUpdateRequests; i++) {
String indexOrAlias = randomIndexOrAlias();
bulkRequest.add(new UpdateRequest(indexOrAlias, "type", "id").doc("field1", "value1"));
indices.add(indexOrAlias);
}
nodeClient.bulk(bulkRequest).actionGet();
assertIndicesSubset(indices, bulkShardActions);
}
@Test
public void testGet() {
String getShardAction = GetAction.NAME + "[s]";
interceptTransportActions(getShardAction);
GetRequest getRequest = new GetRequest(randomIndexOrAlias(), "type", "id");
nodeClient.get(getRequest).actionGet();
assertSameIndices(getRequest, getShardAction);
}
@Test
public void testExplain() {
String explainShardAction = ExplainAction.NAME + "[s]";
interceptTransportActions(explainShardAction);
ExplainRequest explainRequest = new ExplainRequest(randomIndexOrAlias(), "type", "id").source(new QuerySourceBuilder().setQuery(QueryBuilders.matchAllQuery()));
nodeClient.explain(explainRequest).actionGet();
assertSameIndices(explainRequest, explainShardAction);
}
@Test
public void testTermVector() {
String termVectorShardAction = TermVectorAction.NAME + "[s]";
interceptTransportActions(termVectorShardAction);
TermVectorRequest termVectorRequest = new TermVectorRequest(randomIndexOrAlias(), "type", "id");
nodeClient.termVector(termVectorRequest).actionGet();
assertSameIndices(termVectorRequest, termVectorShardAction);
}
@Test
public void testMultiTermVector() {
String multiTermVectorsShardAction = MultiTermVectorsAction.NAME + "[shard][s]";
interceptTransportActions(multiTermVectorsShardAction);
List<String> indices = new ArrayList<>();
MultiTermVectorsRequest multiTermVectorsRequest = new MultiTermVectorsRequest();
int numDocs = iterations(1, 30);
for (int i = 0; i < numDocs; i++) {
String indexOrAlias = randomIndexOrAlias();
multiTermVectorsRequest.add(indexOrAlias, "type", Integer.toString(i));
indices.add(indexOrAlias);
}
nodeClient.multiTermVectors(multiTermVectorsRequest).actionGet();
assertIndicesSubset(indices, multiTermVectorsShardAction);
}
@Test
public void testMultiGet() {
String multiGetShardAction = MultiGetAction.NAME + "[shard][s]";
interceptTransportActions(multiGetShardAction);
List<String> indices = new ArrayList<>();
MultiGetRequest multiGetRequest = new MultiGetRequest();
int numDocs = iterations(1, 30);
for (int i = 0; i < numDocs; i++) {
String indexOrAlias = randomIndexOrAlias();
multiGetRequest.add(indexOrAlias, "type", Integer.toString(i));
indices.add(indexOrAlias);
}
nodeClient.multiGet(multiGetRequest).actionGet();
assertIndicesSubset(indices, multiGetShardAction);
}
@Test
public void testCount() {
String countShardAction = CountAction.NAME + "[s]";
interceptTransportActions(countShardAction);
CountRequest countRequest = new CountRequest(randomIndicesOrAliases());
nodeClient.count(countRequest).actionGet();
assertSameIndices(countRequest, countShardAction);
}
@Test
public void testExists() {
String existsShardAction = ExistsAction.NAME + "[s]";
interceptTransportActions(existsShardAction);
ExistsRequest existsRequest = new ExistsRequest(randomIndicesOrAliases());
nodeClient.exists(existsRequest).actionGet();
assertSameIndices(existsRequest, existsShardAction);
}
@Test
public void testFlush() {
String flushShardAction = FlushAction.NAME + "[s]";
interceptTransportActions(flushShardAction);
FlushRequest flushRequest = new FlushRequest(randomIndicesOrAliases());
nodeClient.admin().indices().flush(flushRequest).actionGet();
assertSameIndices(flushRequest, flushShardAction);
}
@Test
public void testOptimize() {
String optimizeShardAction = OptimizeAction.NAME + "[s]";
interceptTransportActions(optimizeShardAction);
OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases());
nodeClient.admin().indices().optimize(optimizeRequest).actionGet();
assertSameIndices(optimizeRequest, optimizeShardAction);
}
@Test
public void testRefresh() {
String refreshShardAction = RefreshAction.NAME + "[s]";
interceptTransportActions(refreshShardAction);
RefreshRequest refreshRequest = new RefreshRequest(randomIndicesOrAliases());
nodeClient.admin().indices().refresh(refreshRequest).actionGet();
assertSameIndices(refreshRequest, refreshShardAction);
}
@Test
public void testClearCache() {
String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]";
interceptTransportActions(clearCacheAction);
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases());
nodeClient.admin().indices().clearCache(clearIndicesCacheRequest).actionGet();
assertSameIndices(clearIndicesCacheRequest, clearCacheAction);
}
@Test
public void testRecovery() {
String recoveryAction = RecoveryAction.NAME + "[s]";
interceptTransportActions(recoveryAction);
RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases());
nodeClient.admin().indices().recoveries(recoveryRequest).actionGet();
assertSameIndices(recoveryRequest, recoveryAction);
}
@Test
public void testSegments() {
String segmentsAction = IndicesSegmentsAction.NAME + "[s]";
interceptTransportActions(segmentsAction);
IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases());
nodeClient.admin().indices().segments(segmentsRequest).actionGet();
assertSameIndices(segmentsRequest, segmentsAction);
}
@Test
public void testIndicesStats() {
String indicesStats = IndicesStatsAction.NAME + "[s]";
interceptTransportActions(indicesStats);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases());
nodeClient.admin().indices().stats(indicesStatsRequest).actionGet();
assertSameIndices(indicesStatsRequest, indicesStats);
}
@Test
public void testSuggest() {
String suggestAction = SuggestAction.NAME + "[s]";
interceptTransportActions(suggestAction);
SuggestRequest suggestRequest = new SuggestRequest(randomIndicesOrAliases());
nodeClient.suggest(suggestRequest).actionGet();
assertSameIndices(suggestRequest, suggestAction);
}
@Test
public void testValidateQuery() {
String validateQueryShardAction = ValidateQueryAction.NAME + "[s]";
interceptTransportActions(validateQueryShardAction);
ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(randomIndicesOrAliases());
nodeClient.admin().indices().validateQuery(validateQueryRequest).actionGet();
assertSameIndices(validateQueryRequest, validateQueryShardAction);
}
@Test
public void testPercolate() {
String percolateShardAction = PercolateAction.NAME + "[s]";
interceptTransportActions(percolateShardAction);
client().prepareIndex("test-get", "type", "1").setSource("field","value").get();
PercolateRequest percolateRequest = new PercolateRequest().indices(randomIndicesOrAliases()).documentType("type");
if (randomBoolean()) {
percolateRequest.getRequest(new GetRequest("test-get", "type", "1"));
} else {
percolateRequest.source("\"field\":\"value\"");
}
nodeClient.percolate(percolateRequest).actionGet();
assertSameIndices(percolateRequest, percolateShardAction);
}
@Test
public void testMultiPercolate() {
String multiPercolateShardAction = MultiPercolateAction.NAME + "[shard][s]";
interceptTransportActions(multiPercolateShardAction);
client().prepareIndex("test-get", "type", "1").setSource("field", "value").get();
MultiPercolateRequest multiPercolateRequest = new MultiPercolateRequest();
List<String> indices = new ArrayList<>();
int numRequests = iterations(1, 30);
for (int i = 0; i < numRequests; i++) {
String[] indicesOrAliases = randomIndicesOrAliases();
Collections.addAll(indices, indicesOrAliases);
PercolateRequest percolateRequest = new PercolateRequest().indices(indicesOrAliases).documentType("type");
if (randomBoolean()) {
percolateRequest.getRequest(new GetRequest("test-get", "type", "1"));
} else {
percolateRequest.source("\"field\":\"value\"");
}
multiPercolateRequest.add(percolateRequest);
}
nodeClient.multiPercolate(multiPercolateRequest).actionGet();
assertIndicesSubset(indices, multiPercolateShardAction);
}
@Test
public void testOpenIndex() {
interceptTransportActions(OpenIndexAction.NAME);
OpenIndexRequest openIndexRequest = new OpenIndexRequest(randomUniqueIndicesOrAliases());
nodeClient.admin().indices().open(openIndexRequest).actionGet();
assertSameIndices(openIndexRequest, OpenIndexAction.NAME);
}
@Test
public void testCloseIndex() {
interceptTransportActions(CloseIndexAction.NAME);
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(randomUniqueIndicesOrAliases());
nodeClient.admin().indices().close(closeIndexRequest).actionGet();
assertSameIndices(closeIndexRequest, CloseIndexAction.NAME);
}
@Test
public void testDeleteIndex() {
interceptTransportActions(DeleteIndexAction.NAME);
String[] randomIndicesOrAliases = randomUniqueIndicesOrAliases();
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(randomIndicesOrAliases);
assertAcked(nodeClient.admin().indices().delete(deleteIndexRequest).actionGet());
assertSameIndices(deleteIndexRequest, DeleteIndexAction.NAME);
//explicitly cleanup otherwise the delete index after test gets intercepted too and assertAllRequestsHaveBeenConsumed fails
clearInterceptedActions();
}
@Test
public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
nodeClient.admin().indices().getMappings(getMappingsRequest).actionGet();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);
}
@Test
public void testPutMapping() {
interceptTransportActions(PutMappingAction.NAME);
PutMappingRequest putMappingRequest = new PutMappingRequest(randomUniqueIndicesOrAliases()).type("type").source("field", "type=string");
nodeClient.admin().indices().putMapping(putMappingRequest).actionGet();
assertSameIndices(putMappingRequest, PutMappingAction.NAME);
}
@Test
public void testDeleteMapping() {
interceptTransportActions(DeleteMappingAction.NAME);
String[] indices = randomUniqueIndicesOrAliases();
client().admin().indices().putMapping(new PutMappingRequest(indices).type("type").source("field", "type=string")).actionGet();
DeleteMappingRequest deleteMappingRequest = new DeleteMappingRequest(indices).types("type");
nodeClient.admin().indices().deleteMapping(deleteMappingRequest).actionGet();
assertSameIndices(deleteMappingRequest, DeleteMappingAction.NAME);
}
@Test
public void testGetSettings() {
interceptTransportActions(GetSettingsAction.NAME);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
nodeClient.admin().indices().getSettings(getSettingsRequest).actionGet();
assertSameIndices(getSettingsRequest, GetSettingsAction.NAME);
}
@Test
public void testUpdateSettings() {
interceptTransportActions(UpdateSettingsAction.NAME);
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(randomIndicesOrAliases()).settings(ImmutableSettings.builder().put("refresh_interval", -1));
nodeClient.admin().indices().updateSettings(updateSettingsRequest).actionGet();
assertSameIndices(updateSettingsRequest, UpdateSettingsAction.NAME);
}
@Test
public void testSearchQueryThenFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.QUERY_THEN_FETCH);
SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), greaterThan(0l));
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_ACTION_NAME, SearchServiceTransportAction.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
}
@Test
public void testSearchDfsQueryThenFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.DFS_QUERY_THEN_FETCH);
SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), greaterThan(0l));
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
}
@Test
public void testSearchQueryAndFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME,
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.QUERY_AND_FETCH);
SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), greaterThan(0l));
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
}
@Test
public void testSearchDfsQueryAndFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME,
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.DFS_QUERY_AND_FETCH);
SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), greaterThan(0l));
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
}
@Test
public void testSearchScan() throws Exception {
interceptTransportActions(SearchServiceTransportAction.SCAN_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
SearchRequest searchRequest = new SearchRequest(randomIndicesOrAliases).searchType(SearchType.SCAN).scroll(new TimeValue(500));
SearchResponse searchResponse = nodeClient.search(searchRequest).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), greaterThan(0l));
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.SCAN_ACTION_NAME);
}
@Test
public void testMoreLikeThis() {
interceptTransportActions(GetAction.NAME + "[s]", SearchServiceTransportAction.QUERY_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
client().prepareIndex(randomIndicesOrAliases[i], "type", "id-" + i).setSource("field", "value").get();
}
refresh();
assertAcked(prepareCreate("test-get").addAlias(new Alias("alias-get")));
client().prepareIndex("test-get", "type", "1").setSource("field","value").get();
String indexGet = randomBoolean() ? "test-get" : "alias-get";
MoreLikeThisRequest moreLikeThisRequest = new MoreLikeThisRequest(indexGet).type("type").id("1")
.searchIndices(randomIndicesOrAliases());
nodeClient.moreLikeThis(moreLikeThisRequest).actionGet();
//explicitly stop intercepting requests since free context is async hence it might keep coming
//after the checks and make assertAllRequestsHaveBeenConsumed fail
clearInterceptedActions();
//get might end up being executed locally, only optionally over the transport
assertSameIndicesOptionalRequests(new String[]{indexGet}, GetAction.NAME + "[s]");
//query might end up being executed locally as well, only optionally over the transport
assertSameIndicesOptionalRequests(moreLikeThisRequest.searchIndices(), SearchServiceTransportAction.QUERY_ACTION_NAME);
//free context messages are not necessarily sent through the transport, but if they are, check their indices
assertSameIndicesOptionalRequests(moreLikeThisRequest.searchIndices(), SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
}
private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
assertSameIndices(originalRequest, false, actions);
}
private static void assertSameIndicesOptionalRequests(IndicesRequest originalRequest, String... actions) {
assertSameIndices(originalRequest, true, actions);
}
private static void assertSameIndices(IndicesRequest originalRequest, boolean optional, String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
if (!optional) {
assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0));
}
for (TransportRequest internalRequest : requests) {
assertThat(internalRequest, instanceOf(IndicesRequest.class));
assertThat(internalRequest.getClass().getName(), ((IndicesRequest)internalRequest).indices(), equalTo(originalRequest.indices()));
assertThat(((IndicesRequest)internalRequest).indicesOptions(), equalTo(originalRequest.indicesOptions()));
}
}
}
private static void assertSameIndices(String[] indices, String... actions) {
assertSameIndices(indices, false, actions);
}
private static void assertSameIndicesOptionalRequests(String[] indices, String... actions) {
assertSameIndices(indices, true, actions);
}
private static void assertSameIndices(String[] indices, boolean optional, String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
if (!optional) {
assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0));
}
for (TransportRequest internalRequest : requests) {
assertThat(internalRequest, instanceOf(IndicesRequest.class));
assertThat(internalRequest.getClass().getName(), ((IndicesRequest)internalRequest).indices(), equalTo(indices));
}
}
}
private static void assertIndicesSubset(List<String> indices, String... actions) {
//indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0));
for (TransportRequest internalRequest : requests) {
assertThat(internalRequest, instanceOf(IndicesRequest.class));
for (String index : ((IndicesRequest) internalRequest).indices()) {
assertThat(indices, hasItem(index));
}
}
}
}
private String randomIndexOrAlias() {
String index = randomFrom(indices);
if (randomBoolean()) {
return index + "-alias";
} else {
return index;
}
}
private String[] randomIndicesOrAliases() {
int count = randomIntBetween(1, indices.size() * 2); //every index has an alias
String[] indices = new String[count];
for (int i = 0; i < count; i++) {
indices[i] = randomIndexOrAlias();
}
return indices;
}
private String[] randomUniqueIndicesOrAliases() {
Set<String> uniqueIndices = new HashSet<>();
int count = randomIntBetween(1, this.indices.size());
while (uniqueIndices.size() < count) {
uniqueIndices.add(randomFrom(this.indices));
}
String[] indices = new String[count];
int i = 0;
for (String index : uniqueIndices) {
indices[i++] = randomBoolean() ? index + "-alias" : index;
}
return indices;
}
private static void assertAllRequestsHaveBeenConsumed() {
Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
assertThat(((InterceptingTransportService)transportService).requests.isEmpty(), equalTo(true));
}
}
private static void clearInterceptedActions() {
Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
((InterceptingTransportService) transportService).clearInterceptedActions();
}
}
private static void interceptTransportActions(String... actions) {
Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
((InterceptingTransportService) transportService).interceptTransportActions(actions);
}
}
private static List<TransportRequest> consumeTransportRequests(String action) {
List<TransportRequest> requests = new ArrayList<>();
Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
for (TransportService transportService : transportServices) {
List<TransportRequest> transportRequests = ((InterceptingTransportService) transportService).consumeRequests(action);
if (transportRequests != null) {
requests.addAll(transportRequests);
}
}
return requests;
}
public static class InterceptingTransportService extends TransportService {
private final Set<String> actions = new CopyOnWriteArraySet<>();
private final ConcurrentMap<String, List<TransportRequest>> requests = new ConcurrentHashMap<>();
@Inject
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
List<TransportRequest> consumeRequests(String action) {
return requests.remove(action);
}
void interceptTransportActions(String... actions) {
Collections.addAll(this.actions, actions);
}
void clearInterceptedActions() {
actions.clear();
}
@Override
public void registerHandler(String action, TransportRequestHandler handler) {
super.registerHandler(action, new InterceptingRequestHandler(action, handler));
}
private class InterceptingRequestHandler implements TransportRequestHandler {
private final TransportRequestHandler requestHandler;
private final String action;
InterceptingRequestHandler(String action, TransportRequestHandler requestHandler) {
this.requestHandler = requestHandler;
this.action = action;
}
@Override
public TransportRequest newInstance() {
return requestHandler.newInstance();
}
@Override
@SuppressWarnings("unchecked")
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
if (actions.contains(action)) {
List<TransportRequest> transportRequests = requests.putIfAbsent(action, Lists.newArrayList(request));
if (transportRequests != null) {
transportRequests.add(request);
}
}
requestHandler.messageReceived(request, channel);
}
@Override
public String executor() {
return requestHandler.executor();
}
@Override
public boolean isForceExecution() {
return requestHandler.isForceExecution();
}
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
public class OriginalIndicesTests extends ElasticsearchTestCase {
private static final IndicesOptions[] indicesOptionsValues = new IndicesOptions[]{
IndicesOptions.lenientExpandOpen() , IndicesOptions.strictExpand(), IndicesOptions.strictExpandOpen(),
IndicesOptions.strictExpandOpenAndForbidClosed(), IndicesOptions.strictSingleIndexNoExpandForbidClosed()};
@Test
public void testOriginalIndicesSerialization() throws IOException {
int iterations = iterations(10, 30);
for (int i = 0; i < iterations; i++) {
OriginalIndices originalIndices = randomOriginalIndices();
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(randomVersion());
OriginalIndices.writeOriginalIndices(originalIndices, out);
BytesStreamInput in = new BytesStreamInput(out.bytes());
in.setVersion(out.getVersion());
OriginalIndices originalIndices2 = OriginalIndices.readOriginalIndices(in);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
assertThat(originalIndices2.indices(), equalTo(originalIndices.indices()));
assertThat(originalIndices2.indicesOptions(), equalTo(originalIndices.indicesOptions()));
} else {
assertThat(originalIndices2.indices(), nullValue());
assertThat(originalIndices2.indicesOptions(), nullValue());
}
}
}
@Test
public void testOptionalOriginalIndicesSerialization() throws IOException {
int iterations = iterations(10, 30);
for (int i = 0; i < iterations; i++) {
OriginalIndices originalIndices;
boolean missing = randomBoolean();
if (missing) {
originalIndices = randomOriginalIndices();
} else {
originalIndices = OriginalIndices.EMPTY;
}
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(randomVersion());
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
BytesStreamInput in = new BytesStreamInput(out.bytes());
in.setVersion(out.getVersion());
OriginalIndices originalIndices2 = OriginalIndices.readOptionalOriginalIndices(in);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
assertThat(originalIndices2.indices(), equalTo(originalIndices.indices()));
assertThat(originalIndices2.indicesOptions(), equalTo(originalIndices.indicesOptions()));
} else {
assertThat(originalIndices2.indices(), nullValue());
assertThat(originalIndices2.indicesOptions(), nullValue());
}
}
}
private static OriginalIndices randomOriginalIndices() {
int numIndices = randomInt(10);
String[] indices = new String[numIndices];
for (int j = 0; j < indices.length; j++) {
indices[j] = randomAsciiOfLength(randomIntBetween(1, 10));
}
IndicesOptions indicesOptions = randomFrom(indicesOptionsValues);
return new OriginalIndices(indices, indicesOptions);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.get;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
public class MultiGetShardRequestTests extends ElasticsearchTestCase {
@Test
public void testSerialization() throws IOException {
MultiGetRequest multiGetRequest = new MultiGetRequest();
if (randomBoolean()) {
multiGetRequest.preference(randomAsciiOfLength(randomIntBetween(1, 10)));
}
if (randomBoolean()) {
multiGetRequest.realtime(false);
}
if (randomBoolean()) {
multiGetRequest.refresh(true);
}
multiGetRequest.ignoreErrorsOnGeneratedFields(randomBoolean());
MultiGetShardRequest multiGetShardRequest = new MultiGetShardRequest(multiGetRequest, "index", 0);
int numItems = iterations(10, 30);
for (int i = 0; i < numItems; i++) {
MultiGetRequest.Item item = new MultiGetRequest.Item("alias-" + randomAsciiOfLength(randomIntBetween(1, 10)), "type", "id-" + i);
if (randomBoolean()) {
int numFields = randomIntBetween(1, 5);
String[] fields = new String[numFields];
for (int j = 0; j < fields.length; j++) {
fields[j] = randomAsciiOfLength(randomIntBetween(1, 10));
}
item.fields(fields);
}
if (randomBoolean()) {
item.version(randomIntBetween(1, Integer.MAX_VALUE));
item.versionType(randomFrom(VersionType.values()));
}
if (randomBoolean()) {
item.fetchSourceContext(new FetchSourceContext(randomBoolean()));
}
multiGetShardRequest.add(0, item);
}
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(randomVersion());
multiGetShardRequest.writeTo(out);
BytesStreamInput in = new BytesStreamInput(out.bytes());
in.setVersion(out.getVersion());
MultiGetShardRequest multiGetShardRequest2 = new MultiGetShardRequest();
multiGetShardRequest2.readFrom(in);
assertThat(multiGetShardRequest2.index(), equalTo(multiGetShardRequest.index()));
assertThat(multiGetShardRequest2.preference(), equalTo(multiGetShardRequest.preference()));
assertThat(multiGetShardRequest2.realtime(), equalTo(multiGetShardRequest.realtime()));
assertThat(multiGetShardRequest2.refresh(), equalTo(multiGetShardRequest.refresh()));
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
assertThat(multiGetShardRequest2.ignoreErrorsOnGeneratedFields(), equalTo(multiGetShardRequest.ignoreErrorsOnGeneratedFields()));
} else {
assertThat(multiGetShardRequest2.ignoreErrorsOnGeneratedFields(), equalTo(false));
}
assertThat(multiGetShardRequest2.items.size(), equalTo(multiGetShardRequest.items.size()));
for (int i = 0; i < multiGetShardRequest2.items.size(); i++) {
MultiGetRequest.Item item = multiGetShardRequest.items.get(i);
MultiGetRequest.Item item2 = multiGetShardRequest2.items.get(i);
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
assertThat(item2.index(), equalTo(item.index()));
} else {
//before 1.4 we have only one index, the concrete one
assertThat(item2.index(), equalTo(multiGetShardRequest.index()));
}
assertThat(item2.type(), equalTo(item.type()));
assertThat(item2.id(), equalTo(item.id()));
assertThat(item2.fields(), equalTo(item.fields()));
assertThat(item2.version(), equalTo(item.version()));
assertThat(item2.versionType(), equalTo(item.versionType()));
assertThat(item2.fetchSourceContext(), equalTo(item.fetchSourceContext()));
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
//we don't serialize the original index before 1.4, it'll get the concrete one
assertThat(multiGetShardRequest2.indices(), equalTo(multiGetShardRequest.indices()));
assertThat(multiGetShardRequest2.indicesOptions(), equalTo(multiGetShardRequest.indicesOptions()));
}
}
}

View File

@ -36,7 +36,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
@ -754,6 +754,39 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
assertThat(indicesStatsResponse.getIndices().containsKey("test"), equalTo(true));
}
@Test
public void testMultiGet() throws ExecutionException, InterruptedException {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
ensureGreen("test");
int numDocs = iterations(10, 50);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + Integer.toString(i));
}
indexRandom(false, indexRequestBuilders);
int iterations = iterations(1, numDocs);
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
for (int i = 0; i < iterations; i++) {
multiGetRequestBuilder.add(new MultiGetRequest.Item(indexOrAlias(), "type", Integer.toString(randomInt(numDocs - 1))));
}
MultiGetResponse multiGetResponse = multiGetRequestBuilder.get();
assertThat(multiGetResponse.getResponses().length, equalTo(iterations));
for (int i = 0; i < multiGetResponse.getResponses().length; i++) {
MultiGetItemResponse multiGetItemResponse = multiGetResponse.getResponses()[i];
assertThat(multiGetItemResponse.isFailed(), equalTo(false));
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
assertThat(multiGetItemResponse.getType(), equalTo("type"));
assertThat(multiGetItemResponse.getId(), equalTo(multiGetRequestBuilder.request().getItems().get(i).id()));
assertThat(multiGetItemResponse.getResponse().isExists(), equalTo(true));
assertThat(multiGetItemResponse.getResponse().getIndex(), equalTo("test"));
assertThat(multiGetItemResponse.getResponse().getType(), equalTo("type"));
assertThat(multiGetItemResponse.getResponse().getId(), equalTo(multiGetRequestBuilder.request().getItems().get(i).id()));
}
}
private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}