diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index dcde45f2f85..40505d5f32d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.util.Iterator; @@ -38,11 +39,42 @@ public class BulkResponse implements ActionResponse, Iterable private BulkItemResponse[] responses; + private long tookInMillis; + BulkResponse() { } - public BulkResponse(BulkItemResponse[] responses) { + public BulkResponse(BulkItemResponse[] responses, long tookInMillis) { this.responses = responses; + this.tookInMillis = tookInMillis; + } + + /** + * How long the bulk execution took. + */ + public TimeValue took() { + return new TimeValue(tookInMillis); + } + + /** + * How long the bulk execution took. + */ + public TimeValue getTook() { + return took(); + } + + /** + * How long the bulk execution took in milliseconds. + */ + public long tookInMillis() { + return tookInMillis; + } + + /** + * How long the bulk execution took in milliseconds. + */ + public long getTookInMillis() { + return tookInMillis(); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9c0dc143343..6fb1ec0b955 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -86,6 +86,7 @@ public class TransportBulkAction extends BaseAction { } @Override protected void doExecute(final BulkRequest bulkRequest, final ActionListener listener) { + final long startTime = System.currentTimeMillis(); Set indices = Sets.newHashSet(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof IndexRequest) { @@ -109,7 +110,7 @@ public class TransportBulkAction extends BaseAction { createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { - executeBulk(bulkRequest, listener); + executeBulk(bulkRequest, startTime, listener); } } @@ -117,7 +118,7 @@ public class TransportBulkAction extends BaseAction { if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { // we have the index, do it if (counter.decrementAndGet() == 0) { - executeBulk(bulkRequest, listener); + executeBulk(bulkRequest, startTime, listener); } } else if (failed.compareAndSet(false, true)) { listener.onFailure(e); @@ -126,16 +127,16 @@ public class TransportBulkAction extends BaseAction { }); } else { if (counter.decrementAndGet() == 0) { - executeBulk(bulkRequest, listener); + executeBulk(bulkRequest, startTime, listener); } } } } else { - executeBulk(bulkRequest, listener); + executeBulk(bulkRequest, startTime, listener); } } - private void executeBulk(final BulkRequest bulkRequest, final ActionListener listener) { + private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener listener) { ClusterState clusterState = clusterService.state(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof IndexRequest) { @@ -251,11 +252,11 @@ public class TransportBulkAction extends BaseAction { if (bulkRequest.listenerThreaded()) { threadPool.execute(new Runnable() { @Override public void run() { - listener.onResponse(new BulkResponse(responses)); + listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); } }); } else { - listener.onResponse(new BulkResponse(responses)); + listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); } } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 19934abbde4..47cb2d188b1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -88,13 +88,14 @@ public class RestBulkAction extends BaseRestHandler { try { XContentBuilder builder = restContentBuilder(request); builder.startObject(); - + builder.field(Fields.TOOK, response.tookInMillis()); builder.startArray(Fields.ITEMS); for (BulkItemResponse itemResponse : response) { builder.startObject(); builder.startObject(itemResponse.opType()); builder.field(Fields._INDEX, itemResponse.index()); builder.field(Fields._TYPE, itemResponse.type()); + builder.field(Fields._ID, itemResponse.id()); long version = itemResponse.version(); if (version != -1) { builder.field(Fields._VERSION, itemResponse.version()); @@ -133,6 +134,7 @@ public class RestBulkAction extends BaseRestHandler { static final XContentBuilderString _ID = new XContentBuilderString("_id"); static final XContentBuilderString ERROR = new XContentBuilderString("error"); static final XContentBuilderString OK = new XContentBuilderString("ok"); + static final XContentBuilderString TOOK = new XContentBuilderString("took"); static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); }