From ca4f27f40ea63a3a0435beca8a5b127fced64753 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 24 Sep 2014 14:54:50 +0200 Subject: [PATCH] Core: Added `_shards` header to all write responses. The header indicates to how many shard copies (primary and replicas shards) a write was supposed to go to, to how many shard copies to write succeeded and potentially captures shard failures if writing into a replica shard fails. For async writes it also includes the number of shards a write is still pending. Closes #7994 --- docs/reference/docs/delete-by-query.asciidoc | 6 +- docs/reference/docs/delete.asciidoc | 5 + docs/reference/docs/index_.asciidoc | 19 ++ docs/reference/migration/migrate_2_0.asciidoc | 10 +- .../test/delete/11_shard_header.yaml | 36 +++ rest-api-spec/test/index/11_shard_header.yaml | 56 ++++ .../test/update/11_shard_header.yaml | 39 +++ .../action/ActionWriteResponse.java | 298 ++++++++++++++++++ .../delete/TransportDeleteMappingAction.java | 9 +- .../action/bulk/BulkItemResponse.java | 8 +- .../action/bulk/BulkShardResponse.java | 4 +- .../action/bulk/TransportBulkAction.java | 11 +- .../action/bulk/TransportShardBulkAction.java | 22 +- .../action/delete/DeleteResponse.java | 4 +- .../action/delete/IndexDeleteResponse.java | 31 +- .../action/delete/ShardDeleteResponse.java | 4 +- .../action/delete/TransportDeleteAction.java | 11 +- .../delete/TransportIndexDeleteAction.java | 15 +- .../delete/TransportShardDeleteAction.java | 8 +- .../deletebyquery/DeleteByQueryResponse.java | 14 +- .../IndexDeleteByQueryResponse.java | 59 +--- .../ShardDeleteByQueryResponse.java | 17 +- .../TransportDeleteByQueryAction.java | 2 +- .../TransportIndexDeleteByQueryAction.java | 13 +- .../TransportShardDeleteByQueryAction.java | 7 +- .../action/index/IndexResponse.java | 4 +- .../action/index/TransportIndexAction.java | 11 +- ...nsportIndexReplicationOperationAction.java | 90 ++++-- ...portIndicesReplicationOperationAction.java | 7 +- ...nsportShardReplicationOperationAction.java | 212 +++++++------ .../action/update/TransportUpdateAction.java | 6 +- .../action/update/UpdateResponse.java | 14 +- .../rest/action/bulk/RestBulkAction.java | 9 +- .../rest/action/delete/RestDeleteAction.java | 9 +- .../RestDeleteByQueryAction.java | 21 +- .../rest/action/index/RestIndexAction.java | 13 +- .../rest/action/update/RestUpdateAction.java | 11 +- .../BasicBackwardsCompatibilityTest.java | 2 +- .../deleteByQuery/DeleteByQueryTests.java | 41 +-- .../document/DocumentActionsTests.java | 5 +- .../document/ShardInfoTests.java | 185 +++++++++++ .../child/SimpleChildQuerySearchTests.java | 18 +- 42 files changed, 984 insertions(+), 382 deletions(-) create mode 100644 rest-api-spec/test/delete/11_shard_header.yaml create mode 100644 rest-api-spec/test/index/11_shard_header.yaml create mode 100644 rest-api-spec/test/update/11_shard_header.yaml create mode 100644 src/main/java/org/elasticsearch/action/ActionWriteResponse.java create mode 100644 src/test/java/org/elasticsearch/document/ShardInfoTests.java diff --git a/docs/reference/docs/delete-by-query.asciidoc b/docs/reference/docs/delete-by-query.asciidoc index d7c70597375..5783e75c4e7 100644 --- a/docs/reference/docs/delete-by-query.asciidoc +++ b/docs/reference/docs/delete-by-query.asciidoc @@ -32,9 +32,9 @@ commands is: "_indices" : { "twitter" : { "_shards" : { - "total" : 5, - "successful" : 5, - "failed" : 0 + "total" : 10, + "failed" : 0, + "successful" : 10, } } } diff --git a/docs/reference/docs/delete.asciidoc b/docs/reference/docs/delete.asciidoc index b9c65959796..6a5e962bbb0 100644 --- a/docs/reference/docs/delete.asciidoc +++ b/docs/reference/docs/delete.asciidoc @@ -16,6 +16,11 @@ The result of the above delete operation is: [source,js] -------------------------------------------------- { + "_shards" : { + "total" : 10, + "failed" : 0, + "successful" : 10 + }, "found" : true, "_index" : "twitter", "_type" : "tweet", diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index 01ce382e965..4ab5eb45c41 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -19,6 +19,11 @@ The result of the above index operation is: [source,js] -------------------------------------------------- { + "_shards" : { + "total" : 10, + "failed" : 0, + "successful" : 10 + }, "_index" : "twitter", "_type" : "tweet", "_id" : "1", @@ -27,6 +32,20 @@ The result of the above index operation is: } -------------------------------------------------- +The `_shards` header provides information about the replication process of the index operation. +* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on. +* `successful`- Indicates the number of shard copies the index operation succeeded on. +* `pending` - Indicates how many shard copies this index operation still needs to go to at the time index operation + succeeded on the primary shard. This field is only returned if `async` replication is used. +* `failures` - An array that contains replication related errors in the case an index operation failed on a replica shard. + +The index operation is successful in the case `successful` is at least 1. + +NOTE: Replica shards may not all be started when an indexing operation successfully returns (by default, a quorum is + required). In that case, `total` will be equal to the total shards based on the index replica settings and + `successful` will be equal to the number of shard started (primary plus replicas). As there were no failures, + the `failed` will be 0. + [float] [[index-creation]] === Automatic Index Creation diff --git a/docs/reference/migration/migrate_2_0.asciidoc b/docs/reference/migration/migrate_2_0.asciidoc index 63eba254847..574f5c57a74 100644 --- a/docs/reference/migration/migrate_2_0.asciidoc +++ b/docs/reference/migration/migrate_2_0.asciidoc @@ -114,4 +114,12 @@ well. The `parent` parameter has been removed from the update request. Before 2.x it just set the routing parameter. The `routing` setting should be used instead. The `parent` setting was confusing, because it had the impression that the parent -a child documents points to can be changed but this is not true. \ No newline at end of file +a child documents points to can be changed but this is not true. + +==== Delete by query + +The meaning of the `_shards` headers in the delete by query response has changed. Before version 2.0 the `total`, +`successful` and `failed` fields in the header are based on the number of primary shards. The failures on replica +shards aren't being kept track of. From version 2.0 the stats in the `_shards` header are based on all shards +of an index. The http status code is left unchanged and is only based on failures that occurred while executing on +primary shards. \ No newline at end of file diff --git a/rest-api-spec/test/delete/11_shard_header.yaml b/rest-api-spec/test/delete/11_shard_header.yaml new file mode 100644 index 00000000000..d1bb4c0df34 --- /dev/null +++ b/rest-api-spec/test/delete/11_shard_header.yaml @@ -0,0 +1,36 @@ +--- +"Delete check shard header": + + - do: + indices.create: + index: foobar + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + + - do: + cluster.health: + wait_for_status: green + + - do: + index: + index: foobar + type: baz + id: 1 + body: { foo: bar } + + - do: + delete: + index: foobar + type: baz + id: 1 + + - match: { _index: foobar } + - match: { _type: baz } + - match: { _id: "1"} + - match: { _version: 2} + - match: { _shards.total: 1} + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - is_false: _shards.pending diff --git a/rest-api-spec/test/index/11_shard_header.yaml b/rest-api-spec/test/index/11_shard_header.yaml new file mode 100644 index 00000000000..e9264905df8 --- /dev/null +++ b/rest-api-spec/test/index/11_shard_header.yaml @@ -0,0 +1,56 @@ +--- +"Index check shard header": + + - do: + indices.create: + index: foobar1 + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + + + - do: + indices.create: + index: foobar2 + body: + settings: + number_of_shards: "1" + number_of_replicas: "1" + + - do: + cluster.health: + wait_for_status: green + + - do: + index: + index: foobar1 + type: baz + id: 1 + body: { foo: bar } + + - match: { _index: foobar1 } + - match: { _type: baz } + - match: { _id: "1"} + - match: { _version: 1} + - match: { _shards.total: 1} + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - is_false: _shards.pending + + - do: + index: + index: foobar2 + type: baz + id: 1 + replication: async + body: { foo: bar } + + - match: { _index: foobar2 } + - match: { _type: baz } + - match: { _id: "1"} + - match: { _version: 1} + - match: { _shards.total: 2} + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - match: { _shards.pending: 1} diff --git a/rest-api-spec/test/update/11_shard_header.yaml b/rest-api-spec/test/update/11_shard_header.yaml new file mode 100644 index 00000000000..eb2e4ff9a91 --- /dev/null +++ b/rest-api-spec/test/update/11_shard_header.yaml @@ -0,0 +1,39 @@ +--- +"Update check shard header": + + - do: + indices.create: + index: foobar + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + + - do: + cluster.health: + wait_for_status: green + + - do: + index: + index: foobar + type: baz + id: 1 + body: { foo: bar } + + - do: + update: + index: foobar + type: baz + id: 1 + body: + doc: + foo: baz + + - match: { _index: foobar } + - match: { _type: baz } + - match: { _id: "1"} + - match: { _version: 2} + - match: { _shards.total: 1} + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - is_false: _shards.pending diff --git a/src/main/java/org/elasticsearch/action/ActionWriteResponse.java b/src/main/java/org/elasticsearch/action/ActionWriteResponse.java new file mode 100644 index 00000000000..35cf05b88e8 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/ActionWriteResponse.java @@ -0,0 +1,298 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +/** + * Base class for write action responses. + */ +public abstract class ActionWriteResponse extends ActionResponse { + + public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0]; + + private ShardInfo shardInfo; + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardInfo.writeTo(out); + } + + public ShardInfo getShardInfo() { + return shardInfo; + } + + public void setShardInfo(ShardInfo shardInfo) { + this.shardInfo = shardInfo; + } + + public static class ShardInfo implements Streamable, ToXContent { + + private int total; + private int successful; + private int pending; + private Failure[] failures = EMPTY; + + public ShardInfo() { + } + + public ShardInfo(int total, int successful, int pending, Failure... failures) { + assert total >= 0 && successful >= 0 && pending >= 0; + this.total = total; + this.successful = successful; + this.pending = pending; + this.failures = failures; + } + + /** + * @return the total number of shards the write should go to. + */ + public int getTotal() { + return total; + } + + /** + * @return the total number of shards the write succeeded on. + */ + public int getSuccessful() { + return successful; + } + + /** + * @return the total number of shards a write is still to be performed on at the time this response was + * created. Typically this will only contain 0, but when async replication is used this number is higher than 0. + */ + public int getPending() { + return pending; + } + + /** + * @return The total number of replication failures. + */ + public int getFailed() { + return failures.length; + } + + /** + * @return The replication failures that have been captured in the case writes have failed on replica shards. + */ + public Failure[] getFailures() { + return failures; + } + + public RestStatus status() { + RestStatus status = RestStatus.OK; + for (Failure failure : failures) { + if (failure.primary() && failure.status().getStatus() > status.getStatus()) { + status = failure.status(); + } + } + return status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + total = in.readVInt(); + successful = in.readVInt(); + pending = in.readVInt(); + int size = in.readVInt(); + failures = new Failure[size]; + for (int i = 0; i < size; i++) { + Failure failure = new Failure(); + failure.readFrom(in); + failures[i] = failure; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(total); + out.writeVInt(successful); + out.writeVInt(pending); + out.writeVInt(failures.length); + for (Failure failure : failures) { + failure.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields._SHARDS); + builder.field(Fields.TOTAL, total); + builder.field(Fields.SUCCESSFUL, successful); + if (pending > 0) { + builder.field(Fields.PENDING, pending); + } + builder.field(Fields.FAILED, getFailed()); + if (failures.length > 0) { + builder.startArray(Fields.FAILURES); + for (Failure failure : failures) { + failure.toXContent(builder, params); + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + + public static ShardInfo readShardInfo(StreamInput in) throws IOException { + ShardInfo shardInfo = new ShardInfo(); + shardInfo.readFrom(in); + return shardInfo; + } + + public static class Failure implements ShardOperationFailedException, ToXContent { + + private String index; + private int shardId; + private String nodeId; + private String reason; + private RestStatus status; + private boolean primary; + + public Failure(String index, int shardId, @Nullable String nodeId, String reason, RestStatus status, boolean primary) { + this.index = index; + this.shardId = shardId; + this.nodeId = nodeId; + this.reason = reason; + this.status = status; + this.primary = primary; + } + + Failure() { + } + + /** + * @return On what index the failure occurred. + */ + public String index() { + return index; + } + + /** + * @return On what shard id the failure occurred. + */ + public int shardId() { + return shardId; + } + + /** + * @return On what node the failure occurred. + */ + @Nullable + public String nodeId() { + return nodeId; + } + + /** + * @return A text description of the failure + */ + public String reason() { + return reason; + } + + /** + * @return The status to report if this failure was a primary failure. + */ + public RestStatus status() { + return status; + } + + /** + * @return Whether this failure occurred on a primary shard. + * (this only reports true for delete by query) + */ + public boolean primary() { + return primary; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + index = in.readString(); + shardId = in.readVInt(); + nodeId = in.readOptionalString(); + reason = in.readString(); + status = RestStatus.readFrom(in); + primary = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeVInt(shardId); + out.writeOptionalString(nodeId); + out.writeString(reason); + RestStatus.writeTo(out, status); + out.writeBoolean(primary); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Fields._INDEX, index); + builder.field(Fields._SHARD, shardId); + builder.field(Fields._NODE, nodeId); + builder.field(Fields.REASON, reason); + builder.field(Fields.STATUS, status); + builder.field(Fields.PRIMARY, primary); + builder.endObject(); + return builder; + } + + private static class Fields { + + private static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + private static final XContentBuilderString _SHARD = new XContentBuilderString("_shard"); + private static final XContentBuilderString _NODE = new XContentBuilderString("_node"); + private static final XContentBuilderString REASON = new XContentBuilderString("reason"); + private static final XContentBuilderString STATUS = new XContentBuilderString("status"); + private static final XContentBuilderString PRIMARY = new XContentBuilderString("primary"); + + } + } + + private static class Fields { + + private static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards"); + private static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + private static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful"); + private static final XContentBuilderString PENDING = new XContentBuilderString("pending"); + private static final XContentBuilderString FAILED = new XContentBuilderString("failed"); + private static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); + + } + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java index ba1deb9107a..02d1b798e17 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.mapping.delete; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; @@ -147,11 +148,9 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc public void onResponse(DeleteByQueryResponse deleteByQueryResponse) { if (logger.isTraceEnabled()) { for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) { - logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getTotalShards(), indexResponse.getSuccessfulShards(), indexResponse.getFailedShards()); - if (indexResponse.getFailedShards() > 0) { - for (ShardOperationFailedException failure : indexResponse.getFailures()) { - logger.trace("[{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.reason()); - } + logger.trace("Delete by query for index [{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getShardInfo().getTotal(), indexResponse.getShardInfo().getSuccessful(), indexResponse.getShardInfo().getFailed()); + for (ActionWriteResponse.ShardInfo.Failure failure : indexResponse.getShardInfo().getFailures()) { + logger.trace("[{}/{}/{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.primary(), failure.nodeId(), failure.reason()); } } } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 7e813c33e2c..a95a7ab0c8a 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; @@ -104,7 +104,7 @@ public class BulkItemResponse implements Streamable { private String opType; - private ActionResponse response; + private ActionWriteResponse response; private Failure failure; @@ -112,7 +112,7 @@ public class BulkItemResponse implements Streamable { } - public BulkItemResponse(int id, String opType, ActionResponse response) { + public BulkItemResponse(int id, String opType, ActionWriteResponse response) { this.id = id; this.opType = opType; this.response = response; @@ -210,7 +210,7 @@ public class BulkItemResponse implements Streamable { * The actual response ({@link IndexResponse} or {@link DeleteResponse}). null in * case of failure. */ - public T getResponse() { + public T getResponse() { return (T) response; } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index 83d5b65f79f..6b08627f5de 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -29,7 +29,7 @@ import java.io.IOException; /** * */ -public class BulkShardResponse extends ActionResponse { +public class BulkShardResponse extends ActionWriteResponse { private ShardId shardId; private BulkItemResponse[] responses; diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 4b68c6b828e..6e48349e25c 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.bulk; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; - import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; @@ -60,11 +59,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; /** @@ -319,6 +314,10 @@ public class TransportBulkAction extends HandledTransportAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { final BulkShardRequest request = shardRequest.request; IndexService indexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); @@ -242,7 +243,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation updateResult = new UpdateResult(null, null, false, t, null); } if (updateResult.success()) { - switch (updateResult.result.operation()) { case UPSERT: case INDEX: @@ -251,7 +251,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation BytesReference indexSourceAsBytes = indexRequest.source(); // add the response IndexResponse indexResponse = result.response(); - UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()); + UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()); if (updateRequest.fields() != null && updateRequest.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); @@ -272,7 +272,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation case DELETE: DeleteResponse response = updateResult.writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); - updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); // Replace the update request to the translated delete request to execute on the replica. item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); @@ -365,8 +365,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation for (int i = 0; i < items.length; i++) { responses[i] = items[i].getPrimaryResponse(); } - BulkShardResponse response = new BulkShardResponse(shardRequest.shardId, responses); - return new PrimaryResponse<>(shardRequest.request, response, ops); + return new Tuple<>(new BulkShardResponse(shardRequest.shardId, responses), shardRequest.request); } private void setResponse(BulkItemRequest request, BulkItemResponse response) { @@ -378,18 +377,21 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation static class WriteResult { - final Object response; + final ActionWriteResponse response; final String mappingTypeToUpdate; final Engine.IndexingOperation op; - WriteResult(Object response, String mappingTypeToUpdate, Engine.IndexingOperation op) { + WriteResult(ActionWriteResponse response, String mappingTypeToUpdate, Engine.IndexingOperation op) { this.response = response; this.mappingTypeToUpdate = mappingTypeToUpdate; this.op = op; } @SuppressWarnings("unchecked") - T response() { + T response() { + // this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica + // request and not use it + response.setShardInfo(new ActionWriteResponse.ShardInfo()); return (T) response; } diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index 40645ee4c00..46a2ded0671 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,7 +31,7 @@ import java.io.IOException; * @see org.elasticsearch.action.delete.DeleteRequest * @see org.elasticsearch.client.Client#delete(DeleteRequest) */ -public class DeleteResponse extends ActionResponse { +public class DeleteResponse extends ActionWriteResponse { private String index; private String id; diff --git a/src/main/java/org/elasticsearch/action/delete/IndexDeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/IndexDeleteResponse.java index 079a9bc7bf8..e691837751c 100644 --- a/src/main/java/org/elasticsearch/action/delete/IndexDeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/IndexDeleteResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,17 +28,13 @@ import java.io.IOException; /** * Delete by query response executed on a specific index. */ -public class IndexDeleteResponse extends ActionResponse { +public class IndexDeleteResponse extends ActionWriteResponse { private String index; - private int successfulShards; - private int failedShards; private ShardDeleteResponse[] deleteResponses; - IndexDeleteResponse(String index, int failedShards, ShardDeleteResponse[] deleteResponses) { + IndexDeleteResponse(String index, ShardDeleteResponse[] deleteResponses) { this.index = index; - this.successfulShards = deleteResponses.length; - this.failedShards = failedShards; this.deleteResponses = deleteResponses; } @@ -49,27 +45,6 @@ public class IndexDeleteResponse extends ActionResponse { return this.index; } - /** - * The total number of shards the delete by query was executed on. - */ - public int getTotalShards() { - return failedShards + successfulShards; - } - - /** - * The successful number of shards the delete by query was executed on. - */ - public int getSuccessfulShards() { - return successfulShards; - } - - /** - * The failed number of shards the delete by query was executed on. - */ - public int getFailedShards() { - return failedShards; - } - public ShardDeleteResponse[] getResponses() { return this.deleteResponses; } diff --git a/src/main/java/org/elasticsearch/action/delete/ShardDeleteResponse.java b/src/main/java/org/elasticsearch/action/delete/ShardDeleteResponse.java index 4e666857c0e..1109d539f80 100644 --- a/src/main/java/org/elasticsearch/action/delete/ShardDeleteResponse.java +++ b/src/main/java/org/elasticsearch/action/delete/ShardDeleteResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -28,7 +28,7 @@ import java.io.IOException; /** * Delete response executed on a specific shard. */ -public class ShardDeleteResponse extends ActionResponse { +public class ShardDeleteResponse extends ActionWriteResponse { private long version; private boolean found; diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index ab913edf12f..04d3491413b 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; @@ -126,7 +127,9 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct break; } } - listener.onResponse(new DeleteResponse(request.concreteIndex(), request.request().type(), request.request().id(), version, found)); + DeleteResponse response = new DeleteResponse(request.concreteIndex(), request.request().type(), request.request().id(), version, found); + response.setShardInfo(indexDeleteResponse.getShardInfo()); + listener.onResponse(response); } @Override @@ -157,7 +160,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct @Override protected DeleteRequest newReplicaRequestInstance() { - return new DeleteRequest(); + return newRequestInstance(); } @Override @@ -166,7 +169,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct } @Override - protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); @@ -186,7 +189,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct } DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found()); - return new PrimaryResponse<>(shardRequest.request, response, null); + return new Tuple<>(response, shardRequest.request); } @Override diff --git a/src/main/java/org/elasticsearch/action/delete/TransportIndexDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportIndexDeleteAction.java index 1f9c89b7913..da2de614ed6 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportIndexDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportIndexDeleteAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; @@ -34,7 +34,7 @@ import java.util.List; * Internal transport action that broadcasts a delete request to all of the shards that belongs to an index. * Used when routing is required but not specified within the delete request. */ -public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction { +public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction { private static final String ACTION_NAME = DeleteAction.NAME + "[index]"; @@ -45,13 +45,10 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati } @Override - protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List shardDeleteResponses, int failuresCount, List shardFailures) { - return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()])); - } - - @Override - protected boolean accumulateExceptions() { - return false; + protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List shardDeleteResponses, ActionWriteResponse.ShardInfo shardInfo) { + IndexDeleteResponse indexDeleteResponse = new IndexDeleteResponse(request.index(), shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()])); + indexDeleteResponse.setShardInfo(shardInfo); + return indexDeleteResponse; } @Override diff --git a/src/main/java/org/elasticsearch/action/delete/TransportShardDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportShardDeleteAction.java index 631a618a614..30a0b27d326 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportShardDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportShardDeleteAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.VersionType; @@ -62,7 +63,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati @Override protected ShardDeleteRequest newReplicaRequestInstance() { - return new ShardDeleteRequest(); + return newRequestInstance(); } @Override @@ -81,7 +82,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati } @Override - protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { ShardDeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY); @@ -98,8 +99,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati } - ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.found()); - return new PrimaryResponse<>(shardRequest.request, response, null); + return new Tuple<>(new ShardDeleteResponse(delete.version(), delete.found()), shardRequest.request); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java b/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java index 92f98cf9b1b..b65744f40a2 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryResponse.java @@ -64,18 +64,8 @@ public class DeleteByQueryResponse extends ActionResponse implements Iterable 0) { - RestStatus indexStatus = indexResponse.getFailures()[0].status(); - if (indexResponse.getFailures().length > 1) { - for (int i = 1; i < indexResponse.getFailures().length; i++) { - if (indexResponse.getFailures()[i].status().getStatus() >= 500) { - indexStatus = indexResponse.getFailures()[i].status(); - } - } - } - if (status.getStatus() < indexStatus.getStatus()) { - status = indexStatus; - } + if (indexResponse.getShardInfo().status().getStatus() > status.getStatus()) { + status = indexResponse.getShardInfo().status(); } } return status; diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryResponse.java b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryResponse.java index be3e5d196c9..2c8d4001bae 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryResponse.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryResponse.java @@ -19,38 +19,25 @@ package org.elasticsearch.action.deletebyquery; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -import java.util.List; /** * Delete by query response executed on a specific index. */ -public class IndexDeleteByQueryResponse extends ActionResponse { +public class IndexDeleteByQueryResponse extends ActionWriteResponse { private String index; - private int successfulShards; - private int failedShards; - private ShardOperationFailedException[] failures; - IndexDeleteByQueryResponse(String index, int successfulShards, int failedShards, List failures) { + IndexDeleteByQueryResponse(String index, ShardInfo failures) { this.index = index; - this.successfulShards = successfulShards; - this.failedShards = failedShards; - if (failures == null || failures.isEmpty()) { - this.failures = new DefaultShardOperationFailedException[0]; - } else { - this.failures = failures.toArray(new ShardOperationFailedException[failures.size()]); - } + setShardInfo(failures); } IndexDeleteByQueryResponse() { - } /** @@ -60,53 +47,15 @@ public class IndexDeleteByQueryResponse extends ActionResponse { return this.index; } - /** - * The total number of shards the delete by query was executed on. - */ - public int getTotalShards() { - return failedShards + successfulShards; - } - - /** - * The successful number of shards the delete by query was executed on. - */ - public int getSuccessfulShards() { - return successfulShards; - } - - /** - * The failed number of shards the delete by query was executed on. - */ - public int getFailedShards() { - return failedShards; - } - - public ShardOperationFailedException[] getFailures() { - return failures; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); index = in.readString(); - successfulShards = in.readVInt(); - failedShards = in.readVInt(); - int size = in.readVInt(); - failures = new ShardOperationFailedException[size]; - for (int i = 0; i < size; i++) { - failures[i] = DefaultShardOperationFailedException.readShardOperationFailed(in); - } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(index); - out.writeVInt(successfulShards); - out.writeVInt(failedShards); - out.writeVInt(failures.length); - for (ShardOperationFailedException failure : failures) { - failure.writeTo(out); - } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryResponse.java b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryResponse.java index 5dee7c61244..63640732d42 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryResponse.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryResponse.java @@ -19,24 +19,11 @@ package org.elasticsearch.action.deletebyquery; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; +import org.elasticsearch.action.ActionWriteResponse; /** * Delete by query response executed on a specific shard. */ -public class ShardDeleteByQueryResponse extends ActionResponse { +public class ShardDeleteByQueryResponse extends ActionWriteResponse { - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 2aeebd18f22..0800a639a81 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction { +public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction { private final DestructiveOperations destructiveOperations; diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java index ae1f42de9a4..607459e7798 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.deletebyquery; -import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; @@ -36,7 +36,7 @@ import java.util.List; /** * Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index. */ -public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction { +public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction { private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]"; @@ -47,13 +47,8 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication } @Override - protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List shardDeleteByQueryResponses, int failuresCount, List shardFailures) { - return new IndexDeleteByQueryResponse(request.index(), shardDeleteByQueryResponses.size(), failuresCount, shardFailures); - } - - @Override - protected boolean accumulateExceptions() { - return true; + protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List shardDeleteByQueryResponses, ActionWriteResponse.ShardInfo shardInfo) { + return new IndexDeleteByQueryResponse(request.index(), shardInfo); } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index dd74d4ba7c9..6e364302e83 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -84,7 +85,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication @Override protected ShardDeleteByQueryRequest newReplicaRequestInstance() { - return new ShardDeleteByQueryRequest(); + return newRequestInstance(); } @Override @@ -98,7 +99,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication } @Override - protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { ShardDeleteByQueryRequest request = shardRequest.request; IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); @@ -115,7 +116,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication SearchContext.removeCurrent(); } } - return new PrimaryResponse<>(shardRequest.request, new ShardDeleteByQueryResponse(), null); + return new Tuple<>(new ShardDeleteByQueryResponse(), shardRequest.request); } diff --git a/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/src/main/java/org/elasticsearch/action/index/IndexResponse.java index c64d3916448..818fd635981 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,7 +31,7 @@ import java.io.IOException; * @see org.elasticsearch.action.index.IndexRequest * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexResponse extends ActionResponse { +public class IndexResponse extends ActionWriteResponse { private String index; private String id; diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 50e53b906d7..c2b0594837a 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; @@ -146,7 +147,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi @Override protected IndexRequest newReplicaRequestInstance() { - return new IndexRequest(); + return newRequestInstance(); } @Override @@ -166,7 +167,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override - protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { final IndexRequest request = shardRequest.request; // validate, if routing is required, that we got routing @@ -184,7 +185,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; boolean created; - Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { @@ -192,7 +192,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } indexShard.index(index); version = index.version(); - op = index; created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, @@ -202,7 +201,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } indexShard.create(create); version = create.version(); - op = create; created = true; } if (request.refresh()) { @@ -219,8 +217,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi assert request.versionType().validateVersionForWrites(request.version()); - IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created); - return new PrimaryResponse<>(shardRequest.request, response, op); + return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request); } @Override diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java index 2704eb1bb85..52bd4ad2b46 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java @@ -19,13 +19,13 @@ package org.elasticsearch.action.support.replication; -import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -34,8 +34,11 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -45,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray; * It relies on a shard sub-action that gets sent over the transport and executed on each of the shard. * The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions). */ -public abstract class TransportIndexReplicationOperationAction +public abstract class TransportIndexReplicationOperationAction extends TransportAction { protected final ClusterService clusterService; - protected final TransportShardReplicationOperationAction shardAction; + protected final TransportShardReplicationOperationAction shardAction; protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService, - ThreadPool threadPool, TransportShardReplicationOperationAction shardAction, ActionFilters actionFilters) { + ThreadPool threadPool, TransportShardReplicationOperationAction shardAction, ActionFilters actionFilters) { super(settings, actionName, threadPool, actionFilters); this.clusterService = clusterService; this.shardAction = shardAction; @@ -71,7 +74,7 @@ public abstract class TransportIndexReplicationOperationAction shardsResponses = new AtomicReferenceArray<>(groups.size()); for (final ShardIterator shardIt : groups) { - ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id()); + final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id()); // TODO for now, we fork operations on shardIt of the index shardRequest.beforeLocalFork(); // optimize for local fork @@ -103,47 +106,74 @@ public abstract class TransportIndexReplicationOperationAction responses = Lists.newArrayList(); - List failures = Lists.newArrayList(); + List responses = new ArrayList<>(); + List failureList = new ArrayList<>(); + + int total = 0; + int pending = 0; + int successful = 0; for (int i = 0; i < shardsResponses.length(); i++) { ShardActionResult shardActionResult = shardsResponses.get(i); - if (shardActionResult == null) { - assert !accumulateExceptions(); - continue; - } + final ActionWriteResponse.ShardInfo sf; if (shardActionResult.isFailure()) { - assert accumulateExceptions() && shardActionResult.shardFailure != null; - failures.add(shardActionResult.shardFailure); + assert shardActionResult.shardInfoOnFailure != null; + sf = shardActionResult.shardInfoOnFailure; } else { responses.add(shardActionResult.shardResponse); + sf = shardActionResult.shardResponse.getShardInfo(); } + total += sf.getTotal(); + pending += sf.getPending(); + successful += sf.getSuccessful(); + failureList.addAll(Arrays.asList(sf.getFailures())); } + assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get(); - assert failures.size() == 0 || failures.size() == failureCounter.get(); - listener.onResponse(newResponseInstance(request, responses, failureCounter.get(), failures)); + final Failure[] failures; + if (failureList.isEmpty()) { + failures = ActionWriteResponse.EMPTY; + } else { + failures = failureList.toArray(new Failure[failureList.size()]); + } + listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, pending, failures))); } } + + private int numShardGroupFailures(List failures) { + int numShardGroupFailures = 0; + for (Failure failure : failures) { + if (failure.primary()) { + numShardGroupFailures++; + } + } + return numShardGroupFailures; + } }); + } } - protected abstract Response newResponseInstance(Request request, List shardResponses, int failuresCount, List shardFailures); + protected abstract Response newResponseInstance(Request request, List shardResponses, ActionWriteResponse.ShardInfo shardInfo); protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException; protected abstract ShardRequest newShardRequestInstance(Request request, int shardId); - protected abstract boolean accumulateExceptions(); - protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); } @@ -155,22 +185,22 @@ public abstract class TransportIndexReplicationOperationAction + ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse> extends TransportAction { protected final ClusterService clusterService; - protected final TransportIndexReplicationOperationAction indexAction; + protected final TransportIndexReplicationOperationAction indexAction; protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - TransportIndexReplicationOperationAction indexAction, ActionFilters actionFilters) { + TransportIndexReplicationOperationAction indexAction, ActionFilters actionFilters) { super(settings, actionName, threadPool, actionFilters); this.clusterService = clusterService; this.indexAction = indexAction; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index b6569a850cc..43c2818dc92 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -19,7 +19,10 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.*; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.*; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; @@ -34,11 +37,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.IndexService; @@ -51,12 +57,14 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** */ -public abstract class TransportShardReplicationOperationAction extends TransportAction { +public abstract class TransportShardReplicationOperationAction extends TransportAction { protected final TransportService transportService; protected final ClusterService clusterService; @@ -105,17 +113,14 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest); + /** + * @return A tuple containing not null values, as first value the result of the primary operation and as second value + * the request to be executed on the replica shards. + */ + protected abstract Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest); protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest); - /** - * Called once replica operations have been dispatched on the - */ - protected void postPrimaryOperation(InternalRequest request, PrimaryResponse response) { - - } - protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException; protected abstract boolean checkWriteConsistency(); @@ -142,14 +147,6 @@ public abstract class TransportShardReplicationOperationActionfalse meaning operations - * will be executed on the replica. - */ - protected boolean ignoreReplicas() { - return false; - } - protected boolean retryPrimaryException(Throwable e) { return TransportActions.isShardNotAvailableException(e); } @@ -499,8 +496,9 @@ public abstract class TransportShardReplicationOperationAction response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request())); - performReplicas(response); + PrimaryOperationRequest por = new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()); + Tuple primaryResponse = shardOperationOnPrimary(clusterState, por); + performReplicas(por, primaryResponse); } catch (Throwable e) { internalRequest.request.setCanHaveDuplicates(); // shard has not been allocated yet, retry it here @@ -523,14 +521,8 @@ public abstract class TransportShardReplicationOperationAction response) { - if (ignoreReplicas()) { - postPrimaryOperation(internalRequest, response); - listener.onResponse(response.response()); - return; - } + void performReplicas(PrimaryOperationRequest por, Tuple primaryResponse) { ShardRouting shard; - // we double check on the state, if it got changed we need to make sure we take the latest one cause // maybe a replica shard started its recovery process and we need to apply it there... @@ -539,6 +531,7 @@ public abstract class TransportShardReplicationOperationAction response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) { + void performOnReplica(final ReplicationState state, final ShardRouting shard, final String nodeId, final IndexMetaData indexMetaData) { // if we don't have that node, it means that it might have failed and will be created again, in // this case, we don't have to do the operation, and just let it failover if (!observer.observedState().nodes().nodeExists(nodeId)) { - if (counter.decrementAndGet() == 0) { - listener.onResponse(response.response()); - } + state.onReplicaFailure(nodeId, null); return; } - final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), response.replicaRequest()); + final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest()); if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { final DiscoveryNode node = observer.observedState().nodes().get(nodeId); transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty vResponse) { - finishIfPossible(); + state.onReplicaSuccess(); } @Override public void handleException(TransportException exp) { + state.onReplicaFailure(nodeId, exp); logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request()); if (!ignoreReplicaException(exp)) { logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); shardStateAction.shardFailed(shard, indexMetaData.getUUID(), "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); } - finishIfPossible(); } - private void finishIfPossible() { - if (counter.decrementAndGet() == 0) { - listener.onResponse(response.response()); - } - } }); } else { if (internalRequest.request().operationThreaded()) { @@ -689,12 +660,11 @@ public abstract class TransportShardReplicationOperationAction { - private final ReplicaRequest replicaRequest; - private final Response response; - private final Object payload; + public final class ReplicationState { - public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) { + private final Request request; + private final ReplicaRequest replicaRequest; + private final Response finalResponse; + private final ShardId shardId; + private final ActionListener listener; + private final AtomicBoolean finished = new AtomicBoolean(false); + private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard + private final ConcurrentMap shardReplicaFailures = ConcurrentCollections.newConcurrentMap(); + + private final AtomicInteger pending; + private final int numberOfShardInstances; + + public ReplicationState(PrimaryOperationRequest por, ShardIterator shardsIter, Response finalResponse, ReplicaRequest replicaRequest, ActionListener listener, int numberOfPendingShardInstances, int numberOfUnassignedReplicas) { + this.request = por.request; + this.finalResponse = finalResponse; this.replicaRequest = replicaRequest; - this.response = response; - this.payload = payload; + this.shardId = shardsIter.shardId(); + this.listener = listener; + this.numberOfShardInstances = 1 + numberOfPendingShardInstances + numberOfUnassignedReplicas; + this.pending = new AtomicInteger(numberOfPendingShardInstances); + } + + public Request request() { + return this.request; } public ReplicaRequest replicaRequest() { return this.replicaRequest; } - public Response response() { - return response; + public void onReplicaFailure(String nodeId, @Nullable Throwable e) { + // Only version conflict should be ignored from being put into the _shards header? + if (e != null && !ignoreReplicaException(e)) { + shardReplicaFailures.put(nodeId, e); + } + finishIfNeeded(); } - public Object payload() { - return payload; + public void onReplicaSuccess() { + success.incrementAndGet(); + finishIfNeeded(); } + + public void forceFinish() { + doFinish(); + } + + private void finishIfNeeded() { + if (pending.decrementAndGet() == 0) { + doFinish(); + } + } + + private void doFinish() { + if (finished.compareAndSet(false, true)) { + final ActionWriteResponse.ShardInfo.Failure[] failuresArray; + if (!shardReplicaFailures.isEmpty()) { + int slot = 0; + failuresArray = new ActionWriteResponse.ShardInfo.Failure[shardReplicaFailures.size()]; + for (Map.Entry entry : shardReplicaFailures.entrySet()) { + String reason = ExceptionsHelper.detailedMessage(entry.getValue()); + RestStatus restStatus = ExceptionsHelper.status(entry.getValue()); + failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure( + shardId.getIndex(), shardId.getId(), entry.getKey(), reason, restStatus, false + ); + } + } else { + failuresArray = ActionWriteResponse.EMPTY; + } + finalResponse.setShardInfo( + new ActionWriteResponse.ShardInfo( + numberOfShardInstances, + success.get(), + pending.get(), + failuresArray + + ) + ); + listener.onResponse(finalResponse); + } + } + } /** diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 687c789b7df..458af997c49 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -182,7 +182,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(upsertRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); if (request.request().fields() != null && request.request().fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); @@ -217,7 +217,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); listener.onResponse(update); } @@ -245,7 +245,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio deleteAction.execute(deleteRequest, new ActionListener() { @Override public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); listener.onResponse(update); } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index e5e075e8eff..b63ecd07165 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.get.GetResult; @@ -28,7 +28,7 @@ import java.io.IOException; /** */ -public class UpdateResponse extends ActionResponse { +public class UpdateResponse extends ActionWriteResponse { private String index; private String id; @@ -38,10 +38,18 @@ public class UpdateResponse extends ActionResponse { private GetResult getResult; public UpdateResponse() { - } + /** + * Constructor to be used when a update didn't translate in a write. + * For example: update script with operation set to none + */ public UpdateResponse(String index, String type, String id, long version, boolean created) { + this(new ShardInfo(0, 0, 0), index, type, id, version, created); + } + + public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) { + setShardInfo(shardInfo); this.index = index; this.id = id; this.type = type; diff --git a/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index c32c5cf9f4c..76d1bd49b1f 100644 --- a/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.rest.action.bulk; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -109,10 +110,12 @@ public class RestBulkAction extends BaseRestHandler { builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); builder.field(Fields.ERROR, itemResponse.getFailure().getMessage()); } else { + ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo(); + shardInfo.toXContent(builder, request); if (itemResponse.getResponse() instanceof DeleteResponse) { DeleteResponse deleteResponse = itemResponse.getResponse(); if (deleteResponse.isFound()) { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + builder.field(Fields.STATUS, shardInfo.status()); } else { builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); } @@ -122,14 +125,14 @@ public class RestBulkAction extends BaseRestHandler { if (indexResponse.isCreated()) { builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); } else { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + builder.field(Fields.STATUS, shardInfo.status()); } } else if (itemResponse.getResponse() instanceof UpdateResponse) { UpdateResponse updateResponse = itemResponse.getResponse(); if (updateResponse.isCreated()) { builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); } else { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + builder.field(Fields.STATUS, shardInfo.status()); } } } diff --git a/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index e2a748524e4..1a4dbe89783 100644 --- a/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.rest.action.delete; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; @@ -35,7 +36,6 @@ import org.elasticsearch.rest.action.support.RestBuilderListener; import static org.elasticsearch.rest.RestRequest.Method.DELETE; import static org.elasticsearch.rest.RestStatus.NOT_FOUND; -import static org.elasticsearch.rest.RestStatus.OK; /** * @@ -74,14 +74,15 @@ public class RestDeleteAction extends BaseRestHandler { client.delete(deleteRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception { - builder.startObject() - .field(Fields.FOUND, result.isFound()) + ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo(); + builder.startObject().field(Fields.FOUND, result.isFound()) .field(Fields._INDEX, result.getIndex()) .field(Fields._TYPE, result.getType()) .field(Fields._ID, result.getId()) .field(Fields._VERSION, result.getVersion()) + .value(shardInfo) .endObject(); - RestStatus status = OK; + RestStatus status = shardInfo.status(); if (!result.isFound()) { status = NOT_FOUND; } diff --git a/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java b/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java index 76321c9cc6a..c9ce37c140b 100644 --- a/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.deletebyquery; -import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; @@ -90,29 +89,11 @@ public class RestDeleteByQueryAction extends BaseRestHandler { builder.startObject(Fields._INDICES); for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : result.getIndices().values()) { builder.startObject(indexDeleteByQueryResponse.getIndex(), XContentBuilder.FieldCaseConversion.NONE); - - builder.startObject(Fields._SHARDS); - builder.field(Fields.TOTAL, indexDeleteByQueryResponse.getTotalShards()); - builder.field(Fields.SUCCESSFUL, indexDeleteByQueryResponse.getSuccessfulShards()); - builder.field(Fields.FAILED, indexDeleteByQueryResponse.getFailedShards()); - ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures(); - if (failures != null && failures.length > 0) { - builder.startArray(Fields.FAILURES); - for (ShardOperationFailedException shardFailure : failures) { - builder.startObject(); - builder.field(Fields.INDEX, shardFailure.index()); - builder.field(Fields.SHARD, shardFailure.shardId()); - builder.field(Fields.REASON, shardFailure.reason()); - builder.endObject(); - } - builder.endArray(); - } + indexDeleteByQueryResponse.getShardInfo().toXContent(builder, request); builder.endObject(); - builder.endObject(); } builder.endObject(); - builder.endObject(); return new BytesRestResponse(restStatus, builder); } }); diff --git a/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 24e00ecf90d..958c229225b 100644 --- a/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.index; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -109,14 +110,16 @@ public class RestIndexAction extends BaseRestHandler { client.index(indexRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field(Fields._INDEX, response.getIndex()) + builder.startObject(); + ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); + builder.field(Fields._INDEX, response.getIndex()) .field(Fields._TYPE, response.getType()) .field(Fields._ID, response.getId()) - .field(Fields._VERSION, response.getVersion()) - .field(Fields.CREATED, response.isCreated()); + .field(Fields._VERSION, response.getVersion()); + shardInfo.toXContent(builder, request); + builder.field(Fields.CREATED, response.isCreated()); builder.endObject(); - RestStatus status = OK; + RestStatus status = shardInfo.status(); if (response.isCreated()) { status = CREATED; } diff --git a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index c60a2e001cb..59416db071e 100644 --- a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.rest.action.update; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; @@ -41,7 +42,6 @@ import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestStatus.CREATED; -import static org.elasticsearch.rest.RestStatus.OK; /** */ @@ -126,12 +126,14 @@ public class RestUpdateAction extends BaseRestHandler { client.update(updateRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field(Fields._INDEX, response.getIndex()) + builder.startObject(); + ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); + builder.field(Fields._INDEX, response.getIndex()) .field(Fields._TYPE, response.getType()) .field(Fields._ID, response.getId()) .field(Fields._VERSION, response.getVersion()); + shardInfo.toXContent(builder, request); if (response.getGetResult() != null) { builder.startObject(Fields.GET); response.getGetResult().toXContentEmbedded(builder, request); @@ -139,7 +141,7 @@ public class RestUpdateAction extends BaseRestHandler { } builder.endObject(); - RestStatus status = OK; + RestStatus status = shardInfo.status(); if (response.isCreated()) { status = CREATED; } @@ -153,7 +155,6 @@ public class RestUpdateAction extends BaseRestHandler { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString _ID = new XContentBuilderString("_id"); static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString MATCHES = new XContentBuilderString("matches"); static final XContentBuilderString GET = new XContentBuilderString("get"); } } diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index 7eff7728bbc..580bdd1c828 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -498,7 +498,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1)); for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) { assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test")); - assertThat(indexDeleteByQueryResponse.getFailures().length, equalTo(0)); + assertThat(indexDeleteByQueryResponse.getShardInfo().getFailures().length, equalTo(0)); } refresh(); diff --git a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java index 66ec621ea6d..48461501317 100644 --- a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java +++ b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.deleteByQuery; -import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; @@ -35,9 +35,7 @@ import org.junit.Test; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.hamcrest.Matchers.*; public class DeleteByQueryTests extends ElasticsearchIntegrationTest { @@ -65,10 +63,9 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = client().prepareDeleteByQuery(); deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); - DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); - assertThat(actionGet.status(), equalTo(RestStatus.OK)); - assertThat(actionGet.getIndex("twitter"), notNullValue()); - assertThat(actionGet.getIndex("twitter").getFailedShards(), equalTo(0)); + DeleteByQueryResponse response = deleteByQueryRequestBuilder.execute().actionGet(); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertSyncShardInfo(response.getIndex("twitter").getShardInfo(), getNumShards("twitter")); client().admin().indices().prepareRefresh().execute().actionGet(); search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); @@ -96,10 +93,9 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { } deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenientExpandOpen()); - DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); - assertThat(actionGet.status(), equalTo(RestStatus.OK)); - assertThat(actionGet.getIndex("twitter").getFailedShards(), equalTo(0)); - assertThat(actionGet.getIndex("twitter"), notNullValue()); + DeleteByQueryResponse response = deleteByQueryRequestBuilder.execute().actionGet(); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertSyncShardInfo(response.getIndex("twitter").getShardInfo(), getNumShards("twitter")); client().admin().indices().prepareRefresh().execute().actionGet(); search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); @@ -117,12 +113,11 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { NumShards twitter = getNumShards("test"); assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(response.getIndex("test").getSuccessfulShards(), equalTo(0)); - assertThat(response.getIndex("test").getFailedShards(), equalTo(twitter.numPrimaries)); + assertThat(response.getIndex("test").getShardInfo().getSuccessful(), equalTo(0)); + assertThat(response.getIndex("test").getShardInfo().getFailures().length, equalTo(twitter.numPrimaries)); assertThat(response.getIndices().size(), equalTo(1)); - assertThat(response.getIndices().get("test").getFailedShards(), equalTo(twitter.numPrimaries)); - assertThat(response.getIndices().get("test").getFailures().length, equalTo(twitter.numPrimaries)); - for (ShardOperationFailedException failure : response.getIndices().get("test").getFailures()) { + assertThat(response.getIndices().get("test").getShardInfo().getFailures().length, equalTo(twitter.numPrimaries)); + for (ActionWriteResponse.ShardInfo.Failure failure : response.getIndices().get("test").getShardInfo().getFailures()) { assertThat(failure.reason(), containsString("[test] [has_child] query and filter unsupported in delete_by_query api")); assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(failure.shardId(), greaterThan(-1)); @@ -182,7 +177,7 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1)); for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) { assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test")); - assertThat(indexDeleteByQueryResponse.getFailures().length, equalTo(0)); + assertThat(indexDeleteByQueryResponse.getShardInfo().getFailures().length, equalTo(0)); } refresh(); @@ -194,4 +189,14 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } + + private void assertSyncShardInfo(ActionWriteResponse.ShardInfo shardInfo, NumShards numShards) { + assertThat(shardInfo.getTotal(), equalTo(numShards.totalNumShards)); + assertThat(shardInfo.getSuccessful(), greaterThanOrEqualTo(numShards.numPrimaries)); + assertThat(shardInfo.getPending(), equalTo(0)); + assertThat(shardInfo.getFailed(), equalTo(0)); + for (ActionWriteResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { + assertThat(failure.status(), equalTo(RestStatus.OK)); + } + } } diff --git a/src/test/java/org/elasticsearch/document/DocumentActionsTests.java b/src/test/java/org/elasticsearch/document/DocumentActionsTests.java index f88be7850c6..24b77f9d2b7 100644 --- a/src/test/java/org/elasticsearch/document/DocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/document/DocumentActionsTests.java @@ -187,8 +187,9 @@ public class DocumentActionsTests extends ElasticsearchIntegrationTest { logger.info("Delete by query"); DeleteByQueryResponse queryResponse = client().prepareDeleteByQuery().setIndices("test").setQuery(termQuery("name", "test2")).execute().actionGet(); - assertThat(queryResponse.getIndex(getConcreteIndexName()).getSuccessfulShards(), equalTo(numShards.numPrimaries)); - assertThat(queryResponse.getIndex(getConcreteIndexName()).getFailedShards(), equalTo(0)); + assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getTotal(), equalTo(numShards.totalNumShards)); + assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getSuccessful(), equalTo(numShards.totalNumShards)); + assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getFailures().length, equalTo(0)); client().admin().indices().refresh(refreshRequest("test")).actionGet(); logger.info("Get [type1/1] and [type1/2], should be empty"); diff --git a/src/test/java/org/elasticsearch/document/ShardInfoTests.java b/src/test/java/org/elasticsearch/document/ShardInfoTests.java new file mode 100644 index 00000000000..f342ee612da --- /dev/null +++ b/src/test/java/org/elasticsearch/document/ShardInfoTests.java @@ -0,0 +1,185 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.document; + +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.*; + +/** + */ +public class ShardInfoTests extends ElasticsearchIntegrationTest { + + private int numCopies; + private int numNodes; + + @Test + public void testIndexAndDelete() throws Exception { + prepareIndex(1); + IndexResponse indexResponse = client().prepareIndex("idx", "type").setSource("{}").get(); + assertShardInfo(indexResponse); + DeleteResponse deleteResponse = client().prepareDelete("idx", "type", indexResponse.getId()).get(); + assertShardInfo(deleteResponse); + } + + @Test + public void testUpdate() throws Exception { + prepareIndex(1); + UpdateResponse updateResponse = client().prepareUpdate("idx", "type", "1").setDoc("{}").setDocAsUpsert(true).get(); + assertShardInfo(updateResponse); + } + + @Test + public void testBulk_withIndexAndDeleteItems() throws Exception { + prepareIndex(1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < 10; i++) { + bulkRequestBuilder.add(client().prepareIndex("idx", "type").setSource("{}")); + } + + BulkResponse bulkResponse = bulkRequestBuilder.get(); + bulkRequestBuilder = client().prepareBulk(); + for (BulkItemResponse item : bulkResponse) { + assertThat(item.isFailed(), equalTo(false)); + assertShardInfo(item.getResponse()); + bulkRequestBuilder.add(client().prepareDelete("idx", "type", item.getId())); + } + + bulkResponse = bulkRequestBuilder.get(); + for (BulkItemResponse item : bulkResponse) { + assertThat(item.isFailed(), equalTo(false)); + assertShardInfo(item.getResponse()); + } + } + + @Test + public void testBulk_withUpdateItems() throws Exception { + prepareIndex(1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < 10; i++) { + bulkRequestBuilder.add(client().prepareUpdate("idx", "type", Integer.toString(i)).setDoc("{}").setDocAsUpsert(true)); + } + + BulkResponse bulkResponse = bulkRequestBuilder.get(); + for (BulkItemResponse item : bulkResponse) { + assertThat(item.isFailed(), equalTo(false)); + assertShardInfo(item.getResponse()); + } + } + + @Test + public void testDeleteWithRoutingRequiredButNotSpecified() throws Exception { + int numPrimaryShards = randomIntBetween(1, 2); + prepareIndex(numPrimaryShards, true); + DeleteResponse deleteResponse = client().prepareDelete("idx", "type", "1").get(); + assertShardInfo(deleteResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0); + } + + @Test + public void testDeleteByQuery() throws Exception { + int numPrimaryShards = randomIntBetween(1, 2); + prepareIndex(numPrimaryShards); + IndexDeleteByQueryResponse indexDeleteByQueryResponse = client().prepareDeleteByQuery("idx") + .setQuery(QueryBuilders.matchAllQuery()) + .get().getIndex("idx"); + assertShardInfo(indexDeleteByQueryResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0); + } + + @Test + public void testIndexWithAsyncReplication() throws Exception { + prepareIndex(1); + IndexResponse indexResponse = client().prepareIndex("idx", "type") + .setReplicationType(ReplicationType.ASYNC) + .setSource("{}") + .get(); + assertShardInfo(indexResponse, numCopies, 1, numNodes - 1); + } + + private void prepareIndex(int numberOfPrimaryShards) throws Exception { + prepareIndex(numberOfPrimaryShards, false); + } + + private void prepareIndex(int numberOfPrimaryShards, boolean routingRequired) throws Exception { + numNodes = cluster().numDataNodes(); + logger.info("Number of nodes: {}", numNodes); + int maxNumberOfCopies = (numNodes * 2) - 1; + numCopies = randomIntBetween(numNodes, maxNumberOfCopies); + logger.info("Number of copies: {}", numCopies); + + assertAcked(prepareCreate("idx").setSettings( + ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numCopies - 1)) + .addMapping("type", "_routing", "required=" + routingRequired) + .get()); + for (int i = 0; i < numberOfPrimaryShards; i++) { + ensureActiveShardCopies(i, numNodes); + } + } + + private void assertShardInfo(ActionWriteResponse response) { + assertShardInfo(response, numCopies, numNodes, 0); + } + + private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful, int expectedPending) { + assertThat(response.getShardInfo().getTotal(), equalTo(expectedTotal)); + assertThat(response.getShardInfo().getSuccessful(), equalTo(expectedSuccessful)); + assertThat(response.getShardInfo().getPending(), equalTo(expectedPending)); + } + + private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertThat(state.routingTable().index("idx"), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue())); + assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount)); + + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx") + .setWaitForRelocatingShards(0) + .get(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") + .setActiveOnly(true) + .get(); + assertThat(recoveryResponse.shardResponses().get("idx").size(), equalTo(0)); + } + }); + } +} diff --git a/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java b/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java index 02ea5fc5ced..fa20daa31f2 100644 --- a/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java +++ b/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java @@ -1390,9 +1390,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest { // Delete by query doesn't support p/c queries. If the delete by query has a different execution mode // that doesn't rely on IW#deleteByQuery() then this test can be changed. DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get(); - assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); - assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); - assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); client().admin().indices().prepareRefresh("test").get(); searchResponse = client().prepareSearch("test") @@ -1435,9 +1435,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest { assertHitCount(searchResponse, 3l); DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get(); - assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); - assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); - assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); client().admin().indices().prepareRefresh("test").get(); searchResponse = client().prepareSearch("test") @@ -1488,9 +1488,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest { DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test") .setQuery(randomHasParent("parent", "p_field", "p_value2")) .get(); - assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); - assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); - assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_parent] query and filter unsupported in delete_by_query api")); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries)); + assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_parent] query and filter unsupported in delete_by_query api")); client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();