bulk response has errors indication + status per item

closes #4002
This commit is contained in:
Shay Banon 2013-12-11 16:04:41 +01:00
parent 10cdb0ae22
commit a3f1c428c2
4 changed files with 68 additions and 15 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
@ -26,6 +27,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -43,12 +45,23 @@ public class BulkItemResponse implements Streamable {
private final String type;
private final String id;
private final String message;
private final RestStatus status;
public Failure(String index, String type, String id, String message) {
public Failure(String index, String type, String id, Throwable t) {
this.index = index;
this.type = type;
this.id = id;
this.message = ExceptionsHelper.detailedMessage(t);
this.status = ExceptionsHelper.status(t);
}
public Failure(String index, String type, String id, String message, RestStatus status) {
this.index = index;
this.type = type;
this.id = id;
this.message = message;
this.status = status;
}
/**
@ -78,6 +91,13 @@ public class BulkItemResponse implements Streamable {
public String getMessage() {
return this.message;
}
/**
* The rest status.
*/
public RestStatus getStatus() {
return this.status;
}
}
private int id;
@ -242,7 +262,12 @@ public class BulkItemResponse implements Streamable {
}
if (in.readBoolean()) {
failure = new Failure(in.readSharedString(), in.readSharedString(), in.readString(), in.readString());
String fIndex = in.readSharedString();
String fType = in.readSharedString();
String fId = in.readString();
String fMessage = in.readString();
RestStatus status = RestStatus.readFrom(in);
failure = new Failure(fIndex, fType, fId, fMessage, status);
}
}
@ -271,6 +296,7 @@ public class BulkItemResponse implements Streamable {
out.writeSharedString(failure.getType());
out.writeString(failure.getId());
out.writeString(failure.getMessage());
RestStatus.writeTo(out, failure.getStatus());
}
}
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
@ -257,19 +258,20 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
public void onFailure(Throwable e) {
// create failures for all relevant requests
String message = ExceptionsHelper.detailedMessage(e);
RestStatus status = ExceptionsHelper.status(e);
for (BulkItemRequest request : requests) {
if (request.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message)));
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message, status)));
} else if (request.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message)));
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message, status)));
} else if (request.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message)));
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message, status)));
}
}
if (counter.decrementAndGet() == 0) {

View File

@ -182,7 +182,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to execute bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e));
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
}
@ -207,7 +207,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e));
// nullify the request so it won't execute on the replicas
request.items()[requestIndex] = null;
}
@ -276,7 +276,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
// we can't try any more
responses[requestIndex] = new BulkItemResponse(item.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t));
;
request.items()[requestIndex] = null; // do not send to replicas
@ -291,7 +291,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
throw (ElasticSearchException) t;
}
if (updateResult.result == null) {
responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), t));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
@ -303,7 +303,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t)));
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), t));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
@ -313,7 +313,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t)));
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), t));
break;
}
}

View File

@ -26,7 +26,9 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
@ -107,6 +109,7 @@ public class RestBulkAction extends BaseRestHandler {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field(Fields.TOOK, response.getTookInMillis());
builder.field(Fields.ERRORS, response.hasFailures());
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
builder.startObject();
@ -119,13 +122,33 @@ public class RestBulkAction extends BaseRestHandler {
builder.field(Fields._VERSION, itemResponse.getVersion());
}
if (itemResponse.isFailed()) {
builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus());
builder.field(Fields.ERROR, itemResponse.getFailure().getMessage());
} else {
builder.field(Fields.OK, true);
}
if (itemResponse.getResponse() instanceof DeleteResponse) {
DeleteResponse deleteResponse = itemResponse.getResponse();
builder.field(Fields.FOUND, !deleteResponse.isNotFound());
if (itemResponse.getResponse() instanceof DeleteResponse) {
DeleteResponse deleteResponse = itemResponse.getResponse();
if (deleteResponse.isNotFound()) {
builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus());
} else {
builder.field(Fields.STATUS, RestStatus.OK.getStatus());
}
builder.field(Fields.FOUND, !deleteResponse.isNotFound());
} else if (itemResponse.getResponse() instanceof IndexResponse) {
IndexResponse indexResponse = itemResponse.getResponse();
if (indexResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else {
builder.field(Fields.STATUS, RestStatus.OK.getStatus());
}
} else if (itemResponse.getResponse() instanceof UpdateResponse) {
UpdateResponse updateResponse = itemResponse.getResponse();
if (updateResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else {
builder.field(Fields.STATUS, RestStatus.OK.getStatus());
}
}
}
builder.endObject();
builder.endObject();
@ -152,9 +175,11 @@ public class RestBulkAction extends BaseRestHandler {
static final class Fields {
static final XContentBuilderString ITEMS = new XContentBuilderString("items");
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString STATUS = new XContentBuilderString("status");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString TOOK = new XContentBuilderString("took");