Core: Added `_shards` header to all write responses.

The header indicates to how many shard copies (primary and replicas shards) a write was supposed to go to, to how many
shard copies to write succeeded and potentially captures shard failures if writing into a replica shard fails.

For async writes it also includes the number of shards a write is still pending.

Closes #7994
This commit is contained in:
Martijn van Groningen 2014-09-24 14:54:50 +02:00
parent 1ad64a97ec
commit ca4f27f40e
42 changed files with 984 additions and 382 deletions

View File

@ -32,9 +32,9 @@ commands is:
"_indices" : { "_indices" : {
"twitter" : { "twitter" : {
"_shards" : { "_shards" : {
"total" : 5, "total" : 10,
"successful" : 5, "failed" : 0,
"failed" : 0 "successful" : 10,
} }
} }
} }

View File

@ -16,6 +16,11 @@ The result of the above delete operation is:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
{ {
"_shards" : {
"total" : 10,
"failed" : 0,
"successful" : 10
},
"found" : true, "found" : true,
"_index" : "twitter", "_index" : "twitter",
"_type" : "tweet", "_type" : "tweet",

View File

@ -19,6 +19,11 @@ The result of the above index operation is:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
{ {
"_shards" : {
"total" : 10,
"failed" : 0,
"successful" : 10
},
"_index" : "twitter", "_index" : "twitter",
"_type" : "tweet", "_type" : "tweet",
"_id" : "1", "_id" : "1",
@ -27,6 +32,20 @@ The result of the above index operation is:
} }
-------------------------------------------------- --------------------------------------------------
The `_shards` header provides information about the replication process of the index operation.
* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
* `successful`- Indicates the number of shard copies the index operation succeeded on.
* `pending` - Indicates how many shard copies this index operation still needs to go to at the time index operation
succeeded on the primary shard. This field is only returned if `async` replication is used.
* `failures` - An array that contains replication related errors in the case an index operation failed on a replica shard.
The index operation is successful in the case `successful` is at least 1.
NOTE: Replica shards may not all be started when an indexing operation successfully returns (by default, a quorum is
required). In that case, `total` will be equal to the total shards based on the index replica settings and
`successful` will be equal to the number of shard started (primary plus replicas). As there were no failures,
the `failed` will be 0.
[float] [float]
[[index-creation]] [[index-creation]]
=== Automatic Index Creation === Automatic Index Creation

View File

@ -114,4 +114,12 @@ well.
The `parent` parameter has been removed from the update request. Before 2.x it just set the routing parameter. The The `parent` parameter has been removed from the update request. Before 2.x it just set the routing parameter. The
`routing` setting should be used instead. The `parent` setting was confusing, because it had the impression that the parent `routing` setting should be used instead. The `parent` setting was confusing, because it had the impression that the parent
a child documents points to can be changed but this is not true. a child documents points to can be changed but this is not true.
==== Delete by query
The meaning of the `_shards` headers in the delete by query response has changed. Before version 2.0 the `total`,
`successful` and `failed` fields in the header are based on the number of primary shards. The failures on replica
shards aren't being kept track of. From version 2.0 the stats in the `_shards` header are based on all shards
of an index. The http status code is left unchanged and is only based on failures that occurred while executing on
primary shards.

View File

@ -0,0 +1,36 @@
---
"Delete check shard header":
- do:
indices.create:
index: foobar
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: green
- do:
index:
index: foobar
type: baz
id: 1
body: { foo: bar }
- do:
delete:
index: foobar
type: baz
id: 1
- match: { _index: foobar }
- match: { _type: baz }
- match: { _id: "1"}
- match: { _version: 2}
- match: { _shards.total: 1}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}
- is_false: _shards.pending

View File

@ -0,0 +1,56 @@
---
"Index check shard header":
- do:
indices.create:
index: foobar1
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
indices.create:
index: foobar2
body:
settings:
number_of_shards: "1"
number_of_replicas: "1"
- do:
cluster.health:
wait_for_status: green
- do:
index:
index: foobar1
type: baz
id: 1
body: { foo: bar }
- match: { _index: foobar1 }
- match: { _type: baz }
- match: { _id: "1"}
- match: { _version: 1}
- match: { _shards.total: 1}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}
- is_false: _shards.pending
- do:
index:
index: foobar2
type: baz
id: 1
replication: async
body: { foo: bar }
- match: { _index: foobar2 }
- match: { _type: baz }
- match: { _id: "1"}
- match: { _version: 1}
- match: { _shards.total: 2}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}
- match: { _shards.pending: 1}

View File

@ -0,0 +1,39 @@
---
"Update check shard header":
- do:
indices.create:
index: foobar
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: green
- do:
index:
index: foobar
type: baz
id: 1
body: { foo: bar }
- do:
update:
index: foobar
type: baz
id: 1
body:
doc:
foo: baz
- match: { _index: foobar }
- match: { _type: baz }
- match: { _id: "1"}
- match: { _version: 2}
- match: { _shards.total: 1}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}
- is_false: _shards.pending

View File

@ -0,0 +1,298 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
* Base class for write action responses.
*/
public abstract class ActionWriteResponse extends ActionResponse {
public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];
private ShardInfo shardInfo;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardInfo.writeTo(out);
}
public ShardInfo getShardInfo() {
return shardInfo;
}
public void setShardInfo(ShardInfo shardInfo) {
this.shardInfo = shardInfo;
}
public static class ShardInfo implements Streamable, ToXContent {
private int total;
private int successful;
private int pending;
private Failure[] failures = EMPTY;
public ShardInfo() {
}
public ShardInfo(int total, int successful, int pending, Failure... failures) {
assert total >= 0 && successful >= 0 && pending >= 0;
this.total = total;
this.successful = successful;
this.pending = pending;
this.failures = failures;
}
/**
* @return the total number of shards the write should go to.
*/
public int getTotal() {
return total;
}
/**
* @return the total number of shards the write succeeded on.
*/
public int getSuccessful() {
return successful;
}
/**
* @return the total number of shards a write is still to be performed on at the time this response was
* created. Typically this will only contain 0, but when async replication is used this number is higher than 0.
*/
public int getPending() {
return pending;
}
/**
* @return The total number of replication failures.
*/
public int getFailed() {
return failures.length;
}
/**
* @return The replication failures that have been captured in the case writes have failed on replica shards.
*/
public Failure[] getFailures() {
return failures;
}
public RestStatus status() {
RestStatus status = RestStatus.OK;
for (Failure failure : failures) {
if (failure.primary() && failure.status().getStatus() > status.getStatus()) {
status = failure.status();
}
}
return status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
total = in.readVInt();
successful = in.readVInt();
pending = in.readVInt();
int size = in.readVInt();
failures = new Failure[size];
for (int i = 0; i < size; i++) {
Failure failure = new Failure();
failure.readFrom(in);
failures[i] = failure;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(successful);
out.writeVInt(pending);
out.writeVInt(failures.length);
for (Failure failure : failures) {
failure.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, total);
builder.field(Fields.SUCCESSFUL, successful);
if (pending > 0) {
builder.field(Fields.PENDING, pending);
}
builder.field(Fields.FAILED, getFailed());
if (failures.length > 0) {
builder.startArray(Fields.FAILURES);
for (Failure failure : failures) {
failure.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
return builder;
}
public static ShardInfo readShardInfo(StreamInput in) throws IOException {
ShardInfo shardInfo = new ShardInfo();
shardInfo.readFrom(in);
return shardInfo;
}
public static class Failure implements ShardOperationFailedException, ToXContent {
private String index;
private int shardId;
private String nodeId;
private String reason;
private RestStatus status;
private boolean primary;
public Failure(String index, int shardId, @Nullable String nodeId, String reason, RestStatus status, boolean primary) {
this.index = index;
this.shardId = shardId;
this.nodeId = nodeId;
this.reason = reason;
this.status = status;
this.primary = primary;
}
Failure() {
}
/**
* @return On what index the failure occurred.
*/
public String index() {
return index;
}
/**
* @return On what shard id the failure occurred.
*/
public int shardId() {
return shardId;
}
/**
* @return On what node the failure occurred.
*/
@Nullable
public String nodeId() {
return nodeId;
}
/**
* @return A text description of the failure
*/
public String reason() {
return reason;
}
/**
* @return The status to report if this failure was a primary failure.
*/
public RestStatus status() {
return status;
}
/**
* @return Whether this failure occurred on a primary shard.
* (this only reports true for delete by query)
*/
public boolean primary() {
return primary;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
shardId = in.readVInt();
nodeId = in.readOptionalString();
reason = in.readString();
status = RestStatus.readFrom(in);
primary = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeOptionalString(nodeId);
out.writeString(reason);
RestStatus.writeTo(out, status);
out.writeBoolean(primary);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields._INDEX, index);
builder.field(Fields._SHARD, shardId);
builder.field(Fields._NODE, nodeId);
builder.field(Fields.REASON, reason);
builder.field(Fields.STATUS, status);
builder.field(Fields.PRIMARY, primary);
builder.endObject();
return builder;
}
private static class Fields {
private static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
private static final XContentBuilderString _SHARD = new XContentBuilderString("_shard");
private static final XContentBuilderString _NODE = new XContentBuilderString("_node");
private static final XContentBuilderString REASON = new XContentBuilderString("reason");
private static final XContentBuilderString STATUS = new XContentBuilderString("status");
private static final XContentBuilderString PRIMARY = new XContentBuilderString("primary");
}
}
private static class Fields {
private static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
private static final XContentBuilderString TOTAL = new XContentBuilderString("total");
private static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful");
private static final XContentBuilderString PENDING = new XContentBuilderString("pending");
private static final XContentBuilderString FAILED = new XContentBuilderString("failed");
private static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.mapping.delete;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse;
@ -147,11 +148,9 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) { public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) { for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) {
logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getTotalShards(), indexResponse.getSuccessfulShards(), indexResponse.getFailedShards()); logger.trace("Delete by query for index [{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getShardInfo().getTotal(), indexResponse.getShardInfo().getSuccessful(), indexResponse.getShardInfo().getFailed());
if (indexResponse.getFailedShards() > 0) { for (ActionWriteResponse.ShardInfo.Failure failure : indexResponse.getShardInfo().getFailures()) {
for (ShardOperationFailedException failure : indexResponse.getFailures()) { logger.trace("[{}/{}/{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.primary(), failure.nodeId(), failure.reason());
logger.trace("[{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.reason());
}
} }
} }
} }

View File

@ -20,7 +20,7 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
@ -104,7 +104,7 @@ public class BulkItemResponse implements Streamable {
private String opType; private String opType;
private ActionResponse response; private ActionWriteResponse response;
private Failure failure; private Failure failure;
@ -112,7 +112,7 @@ public class BulkItemResponse implements Streamable {
} }
public BulkItemResponse(int id, String opType, ActionResponse response) { public BulkItemResponse(int id, String opType, ActionWriteResponse response) {
this.id = id; this.id = id;
this.opType = opType; this.opType = opType;
this.response = response; this.response = response;
@ -210,7 +210,7 @@ public class BulkItemResponse implements Streamable {
* The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in * The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
* case of failure. * case of failure.
*/ */
public <T extends ActionResponse> T getResponse() { public <T extends ActionWriteResponse> T getResponse() {
return (T) response; return (T) response;
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.bulk; package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -29,7 +29,7 @@ import java.io.IOException;
/** /**
* *
*/ */
public class BulkShardResponse extends ActionResponse { public class BulkShardResponse extends ActionWriteResponse {
private ShardId shardId; private ShardId shardId;
private BulkItemResponse[] responses; private BulkItemResponse[] responses;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -60,11 +59,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -319,6 +314,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override @Override
public void onResponse(BulkShardResponse bulkShardResponse) { public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse); responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
@ -111,7 +112,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override @Override
protected BulkShardRequest newReplicaRequestInstance() { protected BulkShardRequest newReplicaRequestInstance() {
return new BulkShardRequest(); return newRequestInstance();
} }
@Override @Override
@ -130,7 +131,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} }
@Override @Override
protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request; final BulkShardRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(request.index()); IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
@ -242,7 +243,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
updateResult = new UpdateResult(null, null, false, t, null); updateResult = new UpdateResult(null, null, false, t, null);
} }
if (updateResult.success()) { if (updateResult.success()) {
switch (updateResult.result.operation()) { switch (updateResult.result.operation()) {
case UPSERT: case UPSERT:
case INDEX: case INDEX:
@ -251,7 +251,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
BytesReference indexSourceAsBytes = indexRequest.source(); BytesReference indexSourceAsBytes = indexRequest.source();
// add the response // add the response
IndexResponse indexResponse = result.response(); IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()); UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) { if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
@ -272,7 +272,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
case DELETE: case DELETE:
DeleteResponse response = updateResult.writeResult.response(); DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request(); DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica. // Replace the update request to the translated delete request to execute on the replica.
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
@ -365,8 +365,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
for (int i = 0; i < items.length; i++) { for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse(); responses[i] = items[i].getPrimaryResponse();
} }
BulkShardResponse response = new BulkShardResponse(shardRequest.shardId, responses); return new Tuple<>(new BulkShardResponse(shardRequest.shardId, responses), shardRequest.request);
return new PrimaryResponse<>(shardRequest.request, response, ops);
} }
private void setResponse(BulkItemRequest request, BulkItemResponse response) { private void setResponse(BulkItemRequest request, BulkItemResponse response) {
@ -378,18 +377,21 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
static class WriteResult { static class WriteResult {
final Object response; final ActionWriteResponse response;
final String mappingTypeToUpdate; final String mappingTypeToUpdate;
final Engine.IndexingOperation op; final Engine.IndexingOperation op;
WriteResult(Object response, String mappingTypeToUpdate, Engine.IndexingOperation op) { WriteResult(ActionWriteResponse response, String mappingTypeToUpdate, Engine.IndexingOperation op) {
this.response = response; this.response = response;
this.mappingTypeToUpdate = mappingTypeToUpdate; this.mappingTypeToUpdate = mappingTypeToUpdate;
this.op = op; this.op = op;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
<T> T response() { <T extends ActionWriteResponse> T response() {
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
// request and not use it
response.setShardInfo(new ActionWriteResponse.ShardInfo());
return (T) response; return (T) response;
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.delete; package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,7 +31,7 @@ import java.io.IOException;
* @see org.elasticsearch.action.delete.DeleteRequest * @see org.elasticsearch.action.delete.DeleteRequest
* @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Client#delete(DeleteRequest)
*/ */
public class DeleteResponse extends ActionResponse { public class DeleteResponse extends ActionWriteResponse {
private String index; private String index;
private String id; private String id;

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.delete; package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,17 +28,13 @@ import java.io.IOException;
/** /**
* Delete by query response executed on a specific index. * Delete by query response executed on a specific index.
*/ */
public class IndexDeleteResponse extends ActionResponse { public class IndexDeleteResponse extends ActionWriteResponse {
private String index; private String index;
private int successfulShards;
private int failedShards;
private ShardDeleteResponse[] deleteResponses; private ShardDeleteResponse[] deleteResponses;
IndexDeleteResponse(String index, int failedShards, ShardDeleteResponse[] deleteResponses) { IndexDeleteResponse(String index, ShardDeleteResponse[] deleteResponses) {
this.index = index; this.index = index;
this.successfulShards = deleteResponses.length;
this.failedShards = failedShards;
this.deleteResponses = deleteResponses; this.deleteResponses = deleteResponses;
} }
@ -49,27 +45,6 @@ public class IndexDeleteResponse extends ActionResponse {
return this.index; return this.index;
} }
/**
* The total number of shards the delete by query was executed on.
*/
public int getTotalShards() {
return failedShards + successfulShards;
}
/**
* The successful number of shards the delete by query was executed on.
*/
public int getSuccessfulShards() {
return successfulShards;
}
/**
* The failed number of shards the delete by query was executed on.
*/
public int getFailedShards() {
return failedShards;
}
public ShardDeleteResponse[] getResponses() { public ShardDeleteResponse[] getResponses() {
return this.deleteResponses; return this.deleteResponses;
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.delete; package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -28,7 +28,7 @@ import java.io.IOException;
/** /**
* Delete response executed on a specific shard. * Delete response executed on a specific shard.
*/ */
public class ShardDeleteResponse extends ActionResponse { public class ShardDeleteResponse extends ActionWriteResponse {
private long version; private long version;
private boolean found; private boolean found;

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -126,7 +127,9 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
break; break;
} }
} }
listener.onResponse(new DeleteResponse(request.concreteIndex(), request.request().type(), request.request().id(), version, found)); DeleteResponse response = new DeleteResponse(request.concreteIndex(), request.request().type(), request.request().id(), version, found);
response.setShardInfo(indexDeleteResponse.getShardInfo());
listener.onResponse(response);
} }
@Override @Override
@ -157,7 +160,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override @Override
protected DeleteRequest newReplicaRequestInstance() { protected DeleteRequest newReplicaRequestInstance() {
return new DeleteRequest(); return newRequestInstance();
} }
@Override @Override
@ -166,7 +169,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
} }
@Override @Override
protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request; DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
@ -186,7 +189,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
} }
DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found()); DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
return new PrimaryResponse<>(shardRequest.request, response, null); return new Tuple<>(response, shardRequest.request);
} }
@Override @Override

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.delete; package org.elasticsearch.action.delete;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -34,7 +34,7 @@ import java.util.List;
* Internal transport action that broadcasts a delete request to all of the shards that belongs to an index. * Internal transport action that broadcasts a delete request to all of the shards that belongs to an index.
* Used when routing is required but not specified within the delete request. * Used when routing is required but not specified within the delete request.
*/ */
public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> { public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteResponse> {
private static final String ACTION_NAME = DeleteAction.NAME + "[index]"; private static final String ACTION_NAME = DeleteAction.NAME + "[index]";
@ -45,13 +45,10 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati
} }
@Override @Override
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) { protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, ActionWriteResponse.ShardInfo shardInfo) {
return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()])); IndexDeleteResponse indexDeleteResponse = new IndexDeleteResponse(request.index(), shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
} indexDeleteResponse.setShardInfo(shardInfo);
return indexDeleteResponse;
@Override
protected boolean accumulateExceptions() {
return false;
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
@ -62,7 +63,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
@Override @Override
protected ShardDeleteRequest newReplicaRequestInstance() { protected ShardDeleteRequest newReplicaRequestInstance() {
return new ShardDeleteRequest(); return newRequestInstance();
} }
@Override @Override
@ -81,7 +82,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
} }
@Override @Override
protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected Tuple<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request; ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY);
@ -98,8 +99,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
} }
ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.found()); return new Tuple<>(new ShardDeleteResponse(delete.version(), delete.found()), shardRequest.request);
return new PrimaryResponse<>(shardRequest.request, response, null);
} }
@Override @Override

View File

@ -64,18 +64,8 @@ public class DeleteByQueryResponse extends ActionResponse implements Iterable<In
public RestStatus status() { public RestStatus status() {
RestStatus status = RestStatus.OK; RestStatus status = RestStatus.OK;
for (IndexDeleteByQueryResponse indexResponse : indices.values()) { for (IndexDeleteByQueryResponse indexResponse : indices.values()) {
if (indexResponse.getFailedShards() > 0) { if (indexResponse.getShardInfo().status().getStatus() > status.getStatus()) {
RestStatus indexStatus = indexResponse.getFailures()[0].status(); status = indexResponse.getShardInfo().status();
if (indexResponse.getFailures().length > 1) {
for (int i = 1; i < indexResponse.getFailures().length; i++) {
if (indexResponse.getFailures()[i].status().getStatus() >= 500) {
indexStatus = indexResponse.getFailures()[i].status();
}
}
}
if (status.getStatus() < indexStatus.getStatus()) {
status = indexStatus;
}
} }
} }
return status; return status;

View File

@ -19,38 +19,25 @@
package org.elasticsearch.action.deletebyquery; package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* Delete by query response executed on a specific index. * Delete by query response executed on a specific index.
*/ */
public class IndexDeleteByQueryResponse extends ActionResponse { public class IndexDeleteByQueryResponse extends ActionWriteResponse {
private String index; private String index;
private int successfulShards;
private int failedShards;
private ShardOperationFailedException[] failures;
IndexDeleteByQueryResponse(String index, int successfulShards, int failedShards, List<ShardOperationFailedException> failures) { IndexDeleteByQueryResponse(String index, ShardInfo failures) {
this.index = index; this.index = index;
this.successfulShards = successfulShards; setShardInfo(failures);
this.failedShards = failedShards;
if (failures == null || failures.isEmpty()) {
this.failures = new DefaultShardOperationFailedException[0];
} else {
this.failures = failures.toArray(new ShardOperationFailedException[failures.size()]);
}
} }
IndexDeleteByQueryResponse() { IndexDeleteByQueryResponse() {
} }
/** /**
@ -60,53 +47,15 @@ public class IndexDeleteByQueryResponse extends ActionResponse {
return this.index; return this.index;
} }
/**
* The total number of shards the delete by query was executed on.
*/
public int getTotalShards() {
return failedShards + successfulShards;
}
/**
* The successful number of shards the delete by query was executed on.
*/
public int getSuccessfulShards() {
return successfulShards;
}
/**
* The failed number of shards the delete by query was executed on.
*/
public int getFailedShards() {
return failedShards;
}
public ShardOperationFailedException[] getFailures() {
return failures;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
index = in.readString(); index = in.readString();
successfulShards = in.readVInt();
failedShards = in.readVInt();
int size = in.readVInt();
failures = new ShardOperationFailedException[size];
for (int i = 0; i < size; i++) {
failures[i] = DefaultShardOperationFailedException.readShardOperationFailed(in);
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(index); out.writeString(index);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(failures.length);
for (ShardOperationFailedException failure : failures) {
failure.writeTo(out);
}
} }
} }

View File

@ -19,24 +19,11 @@
package org.elasticsearch.action.deletebyquery; package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/** /**
* Delete by query response executed on a specific shard. * Delete by query response executed on a specific shard.
*/ */
public class ShardDeleteByQueryResponse extends ActionResponse { public class ShardDeleteByQueryResponse extends ActionWriteResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
} }

View File

@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
*/ */
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> { public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
private final DestructiveOperations destructiveOperations; private final DestructiveOperations destructiveOperations;

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.deletebyquery; package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -36,7 +36,7 @@ import java.util.List;
/** /**
* Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index. * Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index.
*/ */
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> { public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]"; private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]";
@ -47,13 +47,8 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication
} }
@Override @Override
protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List<ShardDeleteByQueryResponse> shardDeleteByQueryResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) { protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List<ShardDeleteByQueryResponse> shardDeleteByQueryResponses, ActionWriteResponse.ShardInfo shardInfo) {
return new IndexDeleteByQueryResponse(request.index(), shardDeleteByQueryResponses.size(), failuresCount, shardFailures); return new IndexDeleteByQueryResponse(request.index(), shardInfo);
}
@Override
protected boolean accumulateExceptions() {
return true;
} }
@Override @Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -84,7 +85,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override @Override
protected ShardDeleteByQueryRequest newReplicaRequestInstance() { protected ShardDeleteByQueryRequest newReplicaRequestInstance() {
return new ShardDeleteByQueryRequest(); return newRequestInstance();
} }
@Override @Override
@ -98,7 +99,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
} }
@Override @Override
protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected Tuple<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request; ShardDeleteByQueryRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
@ -115,7 +116,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
SearchContext.removeCurrent(); SearchContext.removeCurrent();
} }
} }
return new PrimaryResponse<>(shardRequest.request, new ShardDeleteByQueryResponse(), null); return new Tuple<>(new ShardDeleteByQueryResponse(), shardRequest.request);
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.index; package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,7 +31,7 @@ import java.io.IOException;
* @see org.elasticsearch.action.index.IndexRequest * @see org.elasticsearch.action.index.IndexRequest
* @see org.elasticsearch.client.Client#index(IndexRequest) * @see org.elasticsearch.client.Client#index(IndexRequest)
*/ */
public class IndexResponse extends ActionResponse { public class IndexResponse extends ActionWriteResponse {
private String index; private String index;
private String id; private String id;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
@ -146,7 +147,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override @Override
protected IndexRequest newReplicaRequestInstance() { protected IndexRequest newReplicaRequestInstance() {
return new IndexRequest(); return newRequestInstance();
} }
@Override @Override
@ -166,7 +167,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
@Override @Override
protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final IndexRequest request = shardRequest.request; final IndexRequest request = shardRequest.request;
// validate, if routing is required, that we got routing // validate, if routing is required, that we got routing
@ -184,7 +185,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version; long version;
boolean created; boolean created;
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) { if (index.parsedDoc().mappingsModified()) {
@ -192,7 +192,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
indexShard.index(index); indexShard.index(index);
version = index.version(); version = index.version();
op = index;
created = index.created(); created = index.created();
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, Engine.Create create = indexShard.prepareCreate(sourceToParse,
@ -202,7 +201,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
indexShard.create(create); indexShard.create(create);
version = create.version(); version = create.version();
op = create;
created = true; created = true;
} }
if (request.refresh()) { if (request.refresh()) {
@ -219,8 +217,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
assert request.versionType().validateVersionForWrites(request.version()); assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created); return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
return new PrimaryResponse<>(shardRequest.request, response, op);
} }
@Override @Override

View File

@ -19,13 +19,13 @@
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -34,8 +34,11 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -45,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard. * It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions). * The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/ */
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse> public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> { extends TransportAction<Request, Response> {
protected final ClusterService clusterService; protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction; protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction;
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService, protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction, ActionFilters actionFilters) { ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters); super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.shardAction = shardAction; this.shardAction = shardAction;
@ -71,7 +74,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
throw blockException; throw blockException;
} }
GroupShardsIterator groups; final GroupShardsIterator groups;
try { try {
groups = shards(request); groups = shards(request);
} catch (Throwable e) { } catch (Throwable e) {
@ -84,7 +87,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<>(groups.size()); final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<>(groups.size());
for (final ShardIterator shardIt : groups) { for (final ShardIterator shardIt : groups) {
ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id()); final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
// TODO for now, we fork operations on shardIt of the index // TODO for now, we fork operations on shardIt of the index
shardRequest.beforeLocalFork(); // optimize for local fork shardRequest.beforeLocalFork(); // optimize for local fork
@ -103,47 +106,74 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
failureCounter.getAndIncrement(); failureCounter.getAndIncrement();
int index = indexCounter.getAndIncrement(); int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) { // this is a failure for an entire shard group, constructs shard info accordingly
shardsResponses.set(index, new ShardActionResult( final RestStatus status;
new DefaultShardOperationFailedException(request.index(), shardIt.shardId().id(), e))); if (e != null && e instanceof ElasticsearchException) {
status = ((ElasticsearchException) e).status();
} else {
status = RestStatus.INTERNAL_SERVER_ERROR;
} }
Failure failure = new Failure(request.index(), shardIt.shardId().id(), null,
"Failed to execute on all shard copies [" + ExceptionsHelper.detailedMessage(e) + "]", status, true);
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, 0, failure)));
returnIfNeeded(); returnIfNeeded();
} }
private void returnIfNeeded() { private void returnIfNeeded() {
if (completionCounter.decrementAndGet() == 0) { if (completionCounter.decrementAndGet() == 0) {
List<ShardResponse> responses = Lists.newArrayList(); List<ShardResponse> responses = new ArrayList<>();
List<ShardOperationFailedException> failures = Lists.newArrayList(); List<Failure> failureList = new ArrayList<>();
int total = 0;
int pending = 0;
int successful = 0;
for (int i = 0; i < shardsResponses.length(); i++) { for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i); ShardActionResult shardActionResult = shardsResponses.get(i);
if (shardActionResult == null) { final ActionWriteResponse.ShardInfo sf;
assert !accumulateExceptions();
continue;
}
if (shardActionResult.isFailure()) { if (shardActionResult.isFailure()) {
assert accumulateExceptions() && shardActionResult.shardFailure != null; assert shardActionResult.shardInfoOnFailure != null;
failures.add(shardActionResult.shardFailure); sf = shardActionResult.shardInfoOnFailure;
} else { } else {
responses.add(shardActionResult.shardResponse); responses.add(shardActionResult.shardResponse);
sf = shardActionResult.shardResponse.getShardInfo();
} }
total += sf.getTotal();
pending += sf.getPending();
successful += sf.getSuccessful();
failureList.addAll(Arrays.asList(sf.getFailures()));
} }
assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get();
assert failures.size() == 0 || failures.size() == failureCounter.get(); final Failure[] failures;
listener.onResponse(newResponseInstance(request, responses, failureCounter.get(), failures)); if (failureList.isEmpty()) {
failures = ActionWriteResponse.EMPTY;
} else {
failures = failureList.toArray(new Failure[failureList.size()]);
}
listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, pending, failures)));
} }
} }
private int numShardGroupFailures(List<Failure> failures) {
int numShardGroupFailures = 0;
for (Failure failure : failures) {
if (failure.primary()) {
numShardGroupFailures++;
}
}
return numShardGroupFailures;
}
}); });
} }
} }
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, int failuresCount, List<ShardOperationFailedException> shardFailures); protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, ActionWriteResponse.ShardInfo shardInfo);
protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException; protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException;
protected abstract ShardRequest newShardRequestInstance(Request request, int shardId); protected abstract ShardRequest newShardRequestInstance(Request request, int shardId);
protected abstract boolean accumulateExceptions();
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
} }
@ -155,22 +185,22 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
private class ShardActionResult { private class ShardActionResult {
private final ShardResponse shardResponse; private final ShardResponse shardResponse;
private final ShardOperationFailedException shardFailure; private final ActionWriteResponse.ShardInfo shardInfoOnFailure;
private ShardActionResult(ShardResponse shardResponse) { private ShardActionResult(ShardResponse shardResponse) {
assert shardResponse != null; assert shardResponse != null;
this.shardResponse = shardResponse; this.shardResponse = shardResponse;
this.shardFailure = null; this.shardInfoOnFailure = null;
} }
private ShardActionResult(ShardOperationFailedException shardOperationFailedException) { private ShardActionResult(ActionWriteResponse.ShardInfo shardInfoOnFailure) {
assert shardOperationFailedException != null; assert shardInfoOnFailure != null;
this.shardFailure = shardOperationFailedException; this.shardInfoOnFailure = shardInfoOnFailure;
this.shardResponse = null; this.shardResponse = null;
} }
boolean isFailure() { boolean isFailure() {
return shardFailure != null; return shardInfoOnFailure != null;
} }
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -41,15 +42,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
*/ */
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse, public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse> ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> { extends TransportAction<Request, Response> {
protected final ClusterService clusterService; protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction; protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction, ActionFilters actionFilters) { TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters); super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.indexAction = indexAction; this.indexAction = indexAction;

View File

@ -19,7 +19,10 @@
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import org.elasticsearch.*; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
@ -34,11 +37,14 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
@ -51,12 +57,14 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> { public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
protected final TransportService transportService; protected final TransportService transportService;
protected final ClusterService clusterService; protected final ClusterService clusterService;
@ -105,17 +113,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract String executor(); protected abstract String executor();
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest); /**
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest); protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
/**
* Called once replica operations have been dispatched on the
*/
protected void postPrimaryOperation(InternalRequest request, PrimaryResponse<Response, ReplicaRequest> response) {
}
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException; protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
protected abstract boolean checkWriteConsistency(); protected abstract boolean checkWriteConsistency();
@ -142,14 +147,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return TransportRequestOptions.EMPTY; return TransportRequestOptions.EMPTY;
} }
/**
* Should the operations be performed on the replicas as well. Defaults to <tt>false</tt> meaning operations
* will be executed on the replica.
*/
protected boolean ignoreReplicas() {
return false;
}
protected boolean retryPrimaryException(Throwable e) { protected boolean retryPrimaryException(Throwable e) {
return TransportActions.isShardNotAvailableException(e); return TransportActions.isShardNotAvailableException(e);
} }
@ -499,8 +496,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return; return;
} }
try { try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request())); PrimaryOperationRequest por = new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request());
performReplicas(response); Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(clusterState, por);
performReplicas(por, primaryResponse);
} catch (Throwable e) { } catch (Throwable e) {
internalRequest.request.setCanHaveDuplicates(); internalRequest.request.setCanHaveDuplicates();
// shard has not been allocated yet, retry it here // shard has not been allocated yet, retry it here
@ -523,14 +521,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
} }
void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) { void performReplicas(PrimaryOperationRequest por, Tuple<Response, ReplicaRequest> primaryResponse) {
if (ignoreReplicas()) {
postPrimaryOperation(internalRequest, response);
listener.onResponse(response.response());
return;
}
ShardRouting shard; ShardRouting shard;
// we double check on the state, if it got changed we need to make sure we take the latest one cause // we double check on the state, if it got changed we need to make sure we take the latest one cause
// maybe a replica shard started its recovery process and we need to apply it there... // maybe a replica shard started its recovery process and we need to apply it there...
@ -539,6 +531,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// new primary shard as well... // new primary shard as well...
ClusterState newState = clusterService.state(); ClusterState newState = clusterService.state();
ShardRouting newPrimaryShard = null; ShardRouting newPrimaryShard = null;
int numberOfUnassignedReplicas = 0;
if (observer.observedState() != newState) { if (observer.observedState() != newState) {
shardIt.reset(); shardIt.reset();
ShardRouting originalPrimaryShard = null; ShardRouting originalPrimaryShard = null;
@ -561,7 +554,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} else { } else {
newPrimaryShard = shard; newPrimaryShard = shard;
} }
break; }
if (!shard.primary() && shard.unassigned()) {
numberOfUnassignedReplicas++;
} }
} }
shardIt.reset(); shardIt.reset();
@ -572,40 +568,29 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (shard.state() != ShardRoutingState.STARTED) { if (shard.state() != ShardRoutingState.STARTED) {
internalRequest.request().setCanHaveDuplicates(); internalRequest.request().setCanHaveDuplicates();
} }
if (!shard.primary() && shard.unassigned()) {
numberOfUnassignedReplicas++;
}
} }
shardIt.reset(); shardIt.reset();
} }
// initialize the counter int numberOfPendingShardInstances = shardIt.assignedReplicasIncludingRelocating();
int replicaCounter = shardIt.assignedReplicasIncludingRelocating();
if (newPrimaryShard != null) { if (newPrimaryShard != null) {
replicaCounter++; numberOfPendingShardInstances++;
} }
ReplicationState replicationState = new ReplicationState(por, shardIt, primaryResponse.v1(), primaryResponse.v2(), listener, numberOfPendingShardInstances, numberOfUnassignedReplicas);
if (replicaCounter == 0) { if (numberOfPendingShardInstances == 0) {
postPrimaryOperation(internalRequest, response); replicationState.forceFinish();
listener.onResponse(response.response());
return; return;
} }
if (replicationType == ReplicationType.ASYNC) { if (replicationType == ReplicationType.ASYNC) {
postPrimaryOperation(internalRequest, response); replicationState.forceFinish();
// async replication, notify the listener
listener.onResponse(response.response());
// now, trick the counter so it won't decrease to 0 and notify the listeners
replicaCounter = Integer.MIN_VALUE;
} }
// we add one to the replica count to do the postPrimaryOperation
replicaCounter++;
AtomicInteger counter = new AtomicInteger(replicaCounter);
IndexMetaData indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex()); IndexMetaData indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
if (newPrimaryShard != null) { if (newPrimaryShard != null) {
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData); performOnReplica(replicationState, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
} }
shardIt.reset(); // reset the iterator shardIt.reset(); // reset the iterator
@ -629,56 +614,42 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// yet that it was started. We will get an exception IllegalShardState exception if its not started // yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it // and that's fine, we will ignore it
if (!doOnlyOnRelocating) { if (!doOnlyOnRelocating) {
performOnReplica(response, counter, shard, shard.currentNodeId(), indexMetaData); performOnReplica(replicationState, shard, shard.currentNodeId(), indexMetaData);
} }
if (shard.relocating()) { if (shard.relocating()) {
performOnReplica(response, counter, shard, shard.relocatingNodeId(), indexMetaData); performOnReplica(replicationState, shard, shard.relocatingNodeId(), indexMetaData);
} }
} }
// now do the postPrimary operation, and check if the listener needs to be invoked
postPrimaryOperation(internalRequest, response);
// we also invoke here in case replicas finish before postPrimaryAction does
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
} }
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) { void performOnReplica(final ReplicationState state, final ShardRouting shard, final String nodeId, final IndexMetaData indexMetaData) {
// if we don't have that node, it means that it might have failed and will be created again, in // if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover // this case, we don't have to do the operation, and just let it failover
if (!observer.observedState().nodes().nodeExists(nodeId)) { if (!observer.observedState().nodes().nodeExists(nodeId)) {
if (counter.decrementAndGet() == 0) { state.onReplicaFailure(nodeId, null);
listener.onResponse(response.response());
}
return; return;
} }
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), response.replicaRequest()); final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId); final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override @Override
public void handleResponse(TransportResponse.Empty vResponse) { public void handleResponse(TransportResponse.Empty vResponse) {
finishIfPossible(); state.onReplicaSuccess();
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request()); logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) { if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(), shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
} }
finishIfPossible();
} }
private void finishIfPossible() {
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
}
}); });
} else { } else {
if (internalRequest.request().operationThreaded()) { if (internalRequest.request().operationThreaded()) {
@ -689,12 +660,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected void doRun() { protected void doRun() {
try { try {
shardOperationOnReplica(shardRequest); shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) { } catch (Throwable e) {
state.onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e); failReplicaIfNeeded(shard.index(), shard.id(), e);
} }
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
} }
// we must never reject on because of thread pool capacity on replicas // we must never reject on because of thread pool capacity on replicas
@ -705,24 +675,20 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
state.onReplicaFailure(nodeId, t);
} }
}); });
} catch (Throwable e) { } catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e); failReplicaIfNeeded(shard.index(), shard.id(), e);
// we want to decrement the counter here, in teh failure handling, cause we got rejected state.onReplicaFailure(nodeId, e);
// from executing on the thread pool
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
} }
} else { } else {
try { try {
shardOperationOnReplica(shardRequest); shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) { } catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e); failReplicaIfNeeded(shard.index(), shard.id(), e);
} state.onReplicaFailure(nodeId, e);
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
} }
} }
} }
@ -798,28 +764,90 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
} }
public static class PrimaryResponse<Response, ReplicaRequest> { public final class ReplicationState {
private final ReplicaRequest replicaRequest;
private final Response response;
private final Object payload;
public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) { private final Request request;
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
private final ShardId shardId;
private final ActionListener<Response> listener;
private final AtomicBoolean finished = new AtomicBoolean(false);
private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicInteger pending;
private final int numberOfShardInstances;
public ReplicationState(PrimaryOperationRequest por, ShardIterator shardsIter, Response finalResponse, ReplicaRequest replicaRequest, ActionListener<Response> listener, int numberOfPendingShardInstances, int numberOfUnassignedReplicas) {
this.request = por.request;
this.finalResponse = finalResponse;
this.replicaRequest = replicaRequest; this.replicaRequest = replicaRequest;
this.response = response; this.shardId = shardsIter.shardId();
this.payload = payload; this.listener = listener;
this.numberOfShardInstances = 1 + numberOfPendingShardInstances + numberOfUnassignedReplicas;
this.pending = new AtomicInteger(numberOfPendingShardInstances);
}
public Request request() {
return this.request;
} }
public ReplicaRequest replicaRequest() { public ReplicaRequest replicaRequest() {
return this.replicaRequest; return this.replicaRequest;
} }
public Response response() { public void onReplicaFailure(String nodeId, @Nullable Throwable e) {
return response; // Only version conflict should be ignored from being put into the _shards header?
if (e != null && !ignoreReplicaException(e)) {
shardReplicaFailures.put(nodeId, e);
}
finishIfNeeded();
} }
public Object payload() { public void onReplicaSuccess() {
return payload; success.incrementAndGet();
finishIfNeeded();
} }
public void forceFinish() {
doFinish();
}
private void finishIfNeeded() {
if (pending.decrementAndGet() == 0) {
doFinish();
}
}
private void doFinish() {
if (finished.compareAndSet(false, true)) {
final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
if (!shardReplicaFailures.isEmpty()) {
int slot = 0;
failuresArray = new ActionWriteResponse.ShardInfo.Failure[shardReplicaFailures.size()];
for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
String reason = ExceptionsHelper.detailedMessage(entry.getValue());
RestStatus restStatus = ExceptionsHelper.status(entry.getValue());
failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure(
shardId.getIndex(), shardId.getId(), entry.getKey(), reason, restStatus, false
);
}
} else {
failuresArray = ActionWriteResponse.EMPTY;
}
finalResponse.setShardInfo(
new ActionWriteResponse.ShardInfo(
numberOfShardInstances,
success.get(),
pending.get(),
failuresArray
)
);
listener.onResponse(finalResponse);
}
}
} }
/** /**

View File

@ -182,7 +182,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() { indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override @Override
public void onResponse(IndexResponse response) { public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
if (request.request().fields() != null && request.request().fields().length > 0) { if (request.request().fields() != null && request.request().fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
@ -217,7 +217,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() { indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override @Override
public void onResponse(IndexResponse response) { public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
listener.onResponse(update); listener.onResponse(update);
} }
@ -245,7 +245,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() { deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override @Override
public void onResponse(DeleteResponse response) { public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); update.setGetResult(updateHelper.extractGetResult(request.request(), request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
listener.onResponse(update); listener.onResponse(update);
} }

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.update; package org.elasticsearch.action.update;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
@ -28,7 +28,7 @@ import java.io.IOException;
/** /**
*/ */
public class UpdateResponse extends ActionResponse { public class UpdateResponse extends ActionWriteResponse {
private String index; private String index;
private String id; private String id;
@ -38,10 +38,18 @@ public class UpdateResponse extends ActionResponse {
private GetResult getResult; private GetResult getResult;
public UpdateResponse() { public UpdateResponse() {
} }
/**
* Constructor to be used when a update didn't translate in a write.
* For example: update script with operation set to none
*/
public UpdateResponse(String index, String type, String id, long version, boolean created) { public UpdateResponse(String index, String type, String id, long version, boolean created) {
this(new ShardInfo(0, 0, 0), index, type, id, version, created);
}
public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) {
setShardInfo(shardInfo);
this.index = index; this.index = index;
this.id = id; this.id = id;
this.type = type; this.type = type;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest.action.bulk; package org.elasticsearch.rest.action.bulk;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
@ -109,10 +110,12 @@ public class RestBulkAction extends BaseRestHandler {
builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus());
builder.field(Fields.ERROR, itemResponse.getFailure().getMessage()); builder.field(Fields.ERROR, itemResponse.getFailure().getMessage());
} else { } else {
ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo();
shardInfo.toXContent(builder, request);
if (itemResponse.getResponse() instanceof DeleteResponse) { if (itemResponse.getResponse() instanceof DeleteResponse) {
DeleteResponse deleteResponse = itemResponse.getResponse(); DeleteResponse deleteResponse = itemResponse.getResponse();
if (deleteResponse.isFound()) { if (deleteResponse.isFound()) {
builder.field(Fields.STATUS, RestStatus.OK.getStatus()); builder.field(Fields.STATUS, shardInfo.status());
} else { } else {
builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus());
} }
@ -122,14 +125,14 @@ public class RestBulkAction extends BaseRestHandler {
if (indexResponse.isCreated()) { if (indexResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else { } else {
builder.field(Fields.STATUS, RestStatus.OK.getStatus()); builder.field(Fields.STATUS, shardInfo.status());
} }
} else if (itemResponse.getResponse() instanceof UpdateResponse) { } else if (itemResponse.getResponse() instanceof UpdateResponse) {
UpdateResponse updateResponse = itemResponse.getResponse(); UpdateResponse updateResponse = itemResponse.getResponse();
if (updateResponse.isCreated()) { if (updateResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else { } else {
builder.field(Fields.STATUS, RestStatus.OK.getStatus()); builder.field(Fields.STATUS, shardInfo.status());
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest.action.delete; package org.elasticsearch.rest.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
@ -35,7 +36,6 @@ import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.DELETE; import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND; import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
* *
@ -74,14 +74,15 @@ public class RestDeleteAction extends BaseRestHandler {
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) { client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
@Override @Override
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception { public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
builder.startObject() ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo();
.field(Fields.FOUND, result.isFound()) builder.startObject().field(Fields.FOUND, result.isFound())
.field(Fields._INDEX, result.getIndex()) .field(Fields._INDEX, result.getIndex())
.field(Fields._TYPE, result.getType()) .field(Fields._TYPE, result.getType())
.field(Fields._ID, result.getId()) .field(Fields._ID, result.getId())
.field(Fields._VERSION, result.getVersion()) .field(Fields._VERSION, result.getVersion())
.value(shardInfo)
.endObject(); .endObject();
RestStatus status = OK; RestStatus status = shardInfo.status();
if (!result.isFound()) { if (!result.isFound()) {
status = NOT_FOUND; status = NOT_FOUND;
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.deletebyquery; package org.elasticsearch.rest.action.deletebyquery;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@ -90,29 +89,11 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
builder.startObject(Fields._INDICES); builder.startObject(Fields._INDICES);
for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : result.getIndices().values()) { for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : result.getIndices().values()) {
builder.startObject(indexDeleteByQueryResponse.getIndex(), XContentBuilder.FieldCaseConversion.NONE); builder.startObject(indexDeleteByQueryResponse.getIndex(), XContentBuilder.FieldCaseConversion.NONE);
indexDeleteByQueryResponse.getShardInfo().toXContent(builder, request);
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, indexDeleteByQueryResponse.getTotalShards());
builder.field(Fields.SUCCESSFUL, indexDeleteByQueryResponse.getSuccessfulShards());
builder.field(Fields.FAILED, indexDeleteByQueryResponse.getFailedShards());
ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
if (failures != null && failures.length > 0) {
builder.startArray(Fields.FAILURES);
for (ShardOperationFailedException shardFailure : failures) {
builder.startObject();
builder.field(Fields.INDEX, shardFailure.index());
builder.field(Fields.SHARD, shardFailure.shardId());
builder.field(Fields.REASON, shardFailure.reason());
builder.endObject();
}
builder.endArray();
}
builder.endObject(); builder.endObject();
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();
builder.endObject();
return new BytesRestResponse(restStatus, builder); return new BytesRestResponse(restStatus, builder);
} }
}); });

View File

@ -20,6 +20,7 @@
package org.elasticsearch.rest.action.index; package org.elasticsearch.rest.action.index;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
@ -109,14 +110,16 @@ public class RestIndexAction extends BaseRestHandler {
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) { client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
@Override @Override
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception { public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
builder.startObject() builder.startObject();
.field(Fields._INDEX, response.getIndex()) ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
builder.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType()) .field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId()) .field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion()) .field(Fields._VERSION, response.getVersion());
.field(Fields.CREATED, response.isCreated()); shardInfo.toXContent(builder, request);
builder.field(Fields.CREATED, response.isCreated());
builder.endObject(); builder.endObject();
RestStatus status = OK; RestStatus status = shardInfo.status();
if (response.isCreated()) { if (response.isCreated()) {
status = CREATED; status = CREATED;
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest.action.update; package org.elasticsearch.rest.action.update;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ReplicationType;
@ -41,7 +42,6 @@ import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.CREATED; import static org.elasticsearch.rest.RestStatus.CREATED;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
*/ */
@ -126,12 +126,14 @@ public class RestUpdateAction extends BaseRestHandler {
client.update(updateRequest, new RestBuilderListener<UpdateResponse>(channel) { client.update(updateRequest, new RestBuilderListener<UpdateResponse>(channel) {
@Override @Override
public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception { public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception {
builder.startObject() builder.startObject();
.field(Fields._INDEX, response.getIndex()) ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
builder.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType()) .field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId()) .field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion()); .field(Fields._VERSION, response.getVersion());
shardInfo.toXContent(builder, request);
if (response.getGetResult() != null) { if (response.getGetResult() != null) {
builder.startObject(Fields.GET); builder.startObject(Fields.GET);
response.getGetResult().toXContentEmbedded(builder, request); response.getGetResult().toXContentEmbedded(builder, request);
@ -139,7 +141,7 @@ public class RestUpdateAction extends BaseRestHandler {
} }
builder.endObject(); builder.endObject();
RestStatus status = OK; RestStatus status = shardInfo.status();
if (response.isCreated()) { if (response.isCreated()) {
status = CREATED; status = CREATED;
} }
@ -153,7 +155,6 @@ public class RestUpdateAction extends BaseRestHandler {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id"); static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
static final XContentBuilderString GET = new XContentBuilderString("get"); static final XContentBuilderString GET = new XContentBuilderString("get");
} }
} }

View File

@ -498,7 +498,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1)); assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1));
for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) { for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) {
assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test")); assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test"));
assertThat(indexDeleteByQueryResponse.getFailures().length, equalTo(0)); assertThat(indexDeleteByQueryResponse.getShardInfo().getFailures().length, equalTo(0));
} }
refresh(); refresh();

View File

@ -19,7 +19,7 @@
package org.elasticsearch.deleteByQuery; package org.elasticsearch.deleteByQuery;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@ -35,9 +35,7 @@ import org.junit.Test;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
public class DeleteByQueryTests extends ElasticsearchIntegrationTest { public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
@ -65,10 +63,9 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = client().prepareDeleteByQuery(); DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = client().prepareDeleteByQuery();
deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery()); deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); DeleteByQueryResponse response = deleteByQueryRequestBuilder.execute().actionGet();
assertThat(actionGet.status(), equalTo(RestStatus.OK)); assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(actionGet.getIndex("twitter"), notNullValue()); assertSyncShardInfo(response.getIndex("twitter").getShardInfo(), getNumShards("twitter"));
assertThat(actionGet.getIndex("twitter").getFailedShards(), equalTo(0));
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
@ -96,10 +93,9 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
} }
deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenientExpandOpen()); deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenientExpandOpen());
DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet(); DeleteByQueryResponse response = deleteByQueryRequestBuilder.execute().actionGet();
assertThat(actionGet.status(), equalTo(RestStatus.OK)); assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(actionGet.getIndex("twitter").getFailedShards(), equalTo(0)); assertSyncShardInfo(response.getIndex("twitter").getShardInfo(), getNumShards("twitter"));
assertThat(actionGet.getIndex("twitter"), notNullValue());
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); search = client().prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
@ -117,12 +113,11 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
NumShards twitter = getNumShards("test"); NumShards twitter = getNumShards("test");
assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(response.getIndex("test").getSuccessfulShards(), equalTo(0)); assertThat(response.getIndex("test").getShardInfo().getSuccessful(), equalTo(0));
assertThat(response.getIndex("test").getFailedShards(), equalTo(twitter.numPrimaries)); assertThat(response.getIndex("test").getShardInfo().getFailures().length, equalTo(twitter.numPrimaries));
assertThat(response.getIndices().size(), equalTo(1)); assertThat(response.getIndices().size(), equalTo(1));
assertThat(response.getIndices().get("test").getFailedShards(), equalTo(twitter.numPrimaries)); assertThat(response.getIndices().get("test").getShardInfo().getFailures().length, equalTo(twitter.numPrimaries));
assertThat(response.getIndices().get("test").getFailures().length, equalTo(twitter.numPrimaries)); for (ActionWriteResponse.ShardInfo.Failure failure : response.getIndices().get("test").getShardInfo().getFailures()) {
for (ShardOperationFailedException failure : response.getIndices().get("test").getFailures()) {
assertThat(failure.reason(), containsString("[test] [has_child] query and filter unsupported in delete_by_query api")); assertThat(failure.reason(), containsString("[test] [has_child] query and filter unsupported in delete_by_query api"));
assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(failure.shardId(), greaterThan(-1)); assertThat(failure.shardId(), greaterThan(-1));
@ -182,7 +177,7 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1)); assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1));
for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) { for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) {
assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test")); assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test"));
assertThat(indexDeleteByQueryResponse.getFailures().length, equalTo(0)); assertThat(indexDeleteByQueryResponse.getShardInfo().getFailures().length, equalTo(0));
} }
refresh(); refresh();
@ -194,4 +189,14 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
private static String indexOrAlias() { private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias"; return randomBoolean() ? "test" : "alias";
} }
private void assertSyncShardInfo(ActionWriteResponse.ShardInfo shardInfo, NumShards numShards) {
assertThat(shardInfo.getTotal(), equalTo(numShards.totalNumShards));
assertThat(shardInfo.getSuccessful(), greaterThanOrEqualTo(numShards.numPrimaries));
assertThat(shardInfo.getPending(), equalTo(0));
assertThat(shardInfo.getFailed(), equalTo(0));
for (ActionWriteResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
assertThat(failure.status(), equalTo(RestStatus.OK));
}
}
} }

View File

@ -187,8 +187,9 @@ public class DocumentActionsTests extends ElasticsearchIntegrationTest {
logger.info("Delete by query"); logger.info("Delete by query");
DeleteByQueryResponse queryResponse = client().prepareDeleteByQuery().setIndices("test").setQuery(termQuery("name", "test2")).execute().actionGet(); DeleteByQueryResponse queryResponse = client().prepareDeleteByQuery().setIndices("test").setQuery(termQuery("name", "test2")).execute().actionGet();
assertThat(queryResponse.getIndex(getConcreteIndexName()).getSuccessfulShards(), equalTo(numShards.numPrimaries)); assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getTotal(), equalTo(numShards.totalNumShards));
assertThat(queryResponse.getIndex(getConcreteIndexName()).getFailedShards(), equalTo(0)); assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getSuccessful(), equalTo(numShards.totalNumShards));
assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getFailures().length, equalTo(0));
client().admin().indices().refresh(refreshRequest("test")).actionGet(); client().admin().indices().refresh(refreshRequest("test")).actionGet();
logger.info("Get [type1/1] and [type1/2], should be empty"); logger.info("Get [type1/1] and [type1/2], should be empty");

View File

@ -0,0 +1,185 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.document;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
/**
*/
public class ShardInfoTests extends ElasticsearchIntegrationTest {
private int numCopies;
private int numNodes;
@Test
public void testIndexAndDelete() throws Exception {
prepareIndex(1);
IndexResponse indexResponse = client().prepareIndex("idx", "type").setSource("{}").get();
assertShardInfo(indexResponse);
DeleteResponse deleteResponse = client().prepareDelete("idx", "type", indexResponse.getId()).get();
assertShardInfo(deleteResponse);
}
@Test
public void testUpdate() throws Exception {
prepareIndex(1);
UpdateResponse updateResponse = client().prepareUpdate("idx", "type", "1").setDoc("{}").setDocAsUpsert(true).get();
assertShardInfo(updateResponse);
}
@Test
public void testBulk_withIndexAndDeleteItems() throws Exception {
prepareIndex(1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < 10; i++) {
bulkRequestBuilder.add(client().prepareIndex("idx", "type").setSource("{}"));
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
bulkRequestBuilder = client().prepareBulk();
for (BulkItemResponse item : bulkResponse) {
assertThat(item.isFailed(), equalTo(false));
assertShardInfo(item.getResponse());
bulkRequestBuilder.add(client().prepareDelete("idx", "type", item.getId()));
}
bulkResponse = bulkRequestBuilder.get();
for (BulkItemResponse item : bulkResponse) {
assertThat(item.isFailed(), equalTo(false));
assertShardInfo(item.getResponse());
}
}
@Test
public void testBulk_withUpdateItems() throws Exception {
prepareIndex(1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < 10; i++) {
bulkRequestBuilder.add(client().prepareUpdate("idx", "type", Integer.toString(i)).setDoc("{}").setDocAsUpsert(true));
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
for (BulkItemResponse item : bulkResponse) {
assertThat(item.isFailed(), equalTo(false));
assertShardInfo(item.getResponse());
}
}
@Test
public void testDeleteWithRoutingRequiredButNotSpecified() throws Exception {
int numPrimaryShards = randomIntBetween(1, 2);
prepareIndex(numPrimaryShards, true);
DeleteResponse deleteResponse = client().prepareDelete("idx", "type", "1").get();
assertShardInfo(deleteResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0);
}
@Test
public void testDeleteByQuery() throws Exception {
int numPrimaryShards = randomIntBetween(1, 2);
prepareIndex(numPrimaryShards);
IndexDeleteByQueryResponse indexDeleteByQueryResponse = client().prepareDeleteByQuery("idx")
.setQuery(QueryBuilders.matchAllQuery())
.get().getIndex("idx");
assertShardInfo(indexDeleteByQueryResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0);
}
@Test
public void testIndexWithAsyncReplication() throws Exception {
prepareIndex(1);
IndexResponse indexResponse = client().prepareIndex("idx", "type")
.setReplicationType(ReplicationType.ASYNC)
.setSource("{}")
.get();
assertShardInfo(indexResponse, numCopies, 1, numNodes - 1);
}
private void prepareIndex(int numberOfPrimaryShards) throws Exception {
prepareIndex(numberOfPrimaryShards, false);
}
private void prepareIndex(int numberOfPrimaryShards, boolean routingRequired) throws Exception {
numNodes = cluster().numDataNodes();
logger.info("Number of nodes: {}", numNodes);
int maxNumberOfCopies = (numNodes * 2) - 1;
numCopies = randomIntBetween(numNodes, maxNumberOfCopies);
logger.info("Number of copies: {}", numCopies);
assertAcked(prepareCreate("idx").setSettings(
ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numCopies - 1))
.addMapping("type", "_routing", "required=" + routingRequired)
.get());
for (int i = 0; i < numberOfPrimaryShards; i++) {
ensureActiveShardCopies(i, numNodes);
}
}
private void assertShardInfo(ActionWriteResponse response) {
assertShardInfo(response, numCopies, numNodes, 0);
}
private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful, int expectedPending) {
assertThat(response.getShardInfo().getTotal(), equalTo(expectedTotal));
assertThat(response.getShardInfo().getSuccessful(), equalTo(expectedSuccessful));
assertThat(response.getShardInfo().getPending(), equalTo(expectedPending));
}
private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForRelocatingShards(0)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardResponses().get("idx").size(), equalTo(0));
}
});
}
}

View File

@ -1390,9 +1390,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
// Delete by query doesn't support p/c queries. If the delete by query has a different execution mode // Delete by query doesn't support p/c queries. If the delete by query has a different execution mode
// that doesn't rely on IW#deleteByQuery() then this test can be changed. // that doesn't rely on IW#deleteByQuery() then this test can be changed.
DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get(); DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get();
assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0));
assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries));
assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api"));
client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();
searchResponse = client().prepareSearch("test") searchResponse = client().prepareSearch("test")
@ -1435,9 +1435,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
assertHitCount(searchResponse, 3l); assertHitCount(searchResponse, 3l);
DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get(); DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(randomHasChild("child", "c_field", "blue")).get();
assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0));
assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries));
assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api")); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_child] query and filter unsupported in delete_by_query api"));
client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();
searchResponse = client().prepareSearch("test") searchResponse = client().prepareSearch("test")
@ -1488,9 +1488,9 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test") DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test")
.setQuery(randomHasParent("parent", "p_field", "p_value2")) .setQuery(randomHasParent("parent", "p_field", "p_value2"))
.get(); .get();
assertThat(deleteByQueryResponse.getIndex("test").getSuccessfulShards(), equalTo(0)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getSuccessful(), equalTo(0));
assertThat(deleteByQueryResponse.getIndex("test").getFailedShards(), equalTo(getNumShards("test").numPrimaries)); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures().length, equalTo(getNumShards("test").numPrimaries));
assertThat(deleteByQueryResponse.getIndex("test").getFailures()[0].reason(), containsString("[has_parent] query and filter unsupported in delete_by_query api")); assertThat(deleteByQueryResponse.getIndex("test").getShardInfo().getFailures()[0].reason(), containsString("[has_parent] query and filter unsupported in delete_by_query api"));
client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();
client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();
client().admin().indices().prepareRefresh("test").get(); client().admin().indices().prepareRefresh("test").get();