Bulk API: Add how long the bulk API took (in milliseconds) to the response, closes #599.

This commit is contained in:
kimchy 2011-01-05 14:12:45 +02:00
parent 29b308005a
commit 6258915205
3 changed files with 44 additions and 9 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.Iterator;
@ -38,11 +39,42 @@ public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse>
private BulkItemResponse[] responses;
private long tookInMillis;
BulkResponse() {
}
public BulkResponse(BulkItemResponse[] responses) {
public BulkResponse(BulkItemResponse[] responses, long tookInMillis) {
this.responses = responses;
this.tookInMillis = tookInMillis;
}
/**
* How long the bulk execution took.
*/
public TimeValue took() {
return new TimeValue(tookInMillis);
}
/**
* How long the bulk execution took.
*/
public TimeValue getTook() {
return took();
}
/**
* How long the bulk execution took in milliseconds.
*/
public long tookInMillis() {
return tookInMillis;
}
/**
* How long the bulk execution took in milliseconds.
*/
public long getTookInMillis() {
return tookInMillis();
}
/**

View File

@ -86,6 +86,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
}
@Override protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
Set<String> indices = Sets.newHashSet();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
@ -109,7 +110,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
executeBulk(bulkRequest, startTime, listener);
}
}
@ -117,7 +118,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
executeBulk(bulkRequest, startTime, listener);
}
} else if (failed.compareAndSet(false, true)) {
listener.onFailure(e);
@ -126,16 +127,16 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
});
} else {
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, listener);
executeBulk(bulkRequest, startTime, listener);
}
}
}
} else {
executeBulk(bulkRequest, listener);
executeBulk(bulkRequest, startTime, listener);
}
}
private void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener) {
ClusterState clusterState = clusterService.state();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
@ -251,11 +252,11 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
if (bulkRequest.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses));
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
});
} else {
listener.onResponse(new BulkResponse(responses));
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
}
});

View File

@ -88,13 +88,14 @@ public class RestBulkAction extends BaseRestHandler {
try {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field(Fields.TOOK, response.tookInMillis());
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
builder.startObject();
builder.startObject(itemResponse.opType());
builder.field(Fields._INDEX, itemResponse.index());
builder.field(Fields._TYPE, itemResponse.type());
builder.field(Fields._ID, itemResponse.id());
long version = itemResponse.version();
if (version != -1) {
builder.field(Fields._VERSION, itemResponse.version());
@ -133,6 +134,7 @@ public class RestBulkAction extends BaseRestHandler {
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString OK = new XContentBuilderString("ok");
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}