Added `ingest_took` to bulk response to indicate how much time was spent on ingest preprocessing.

The `ingest_took` is separate from `took`, which keeps track how much time is spent on indexing/deleting/updating.
The `ingest_took` is only visible in the rest response if at least for one bulk item has ingest enabled.
This commit is contained in:
Martijn van Groningen 2016-03-01 11:02:10 +01:00
parent 79820ea942
commit 75387001df
6 changed files with 78 additions and 46 deletions

View File

@ -35,31 +35,53 @@ import java.util.Iterator;
*/
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse> {
public final static long NO_INGEST_TOOK = -1L;
private BulkItemResponse[] responses;
private long tookInMillis;
private long ingestTookInMillis;
BulkResponse() {
}
public BulkResponse(BulkItemResponse[] responses, long tookInMillis) {
this(responses, tookInMillis, NO_INGEST_TOOK);
}
public BulkResponse(BulkItemResponse[] responses, long tookInMillis, long ingestTookInMillis) {
this.responses = responses;
this.tookInMillis = tookInMillis;
this.ingestTookInMillis = ingestTookInMillis;
}
/**
* How long the bulk execution took.
* How long the bulk execution took. Excluding ingest preprocessing.
*/
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
/**
* How long the bulk execution took in milliseconds.
* How long the bulk execution took in milliseconds. Excluding ingest preprocessing.
*/
public long getTookInMillis() {
return tookInMillis;
}
/**
* If ingest is enabled returns the bulk ingest preprocessing time, otherwise 0 is returned.
*/
public TimeValue getIngestTook() {
return new TimeValue(ingestTookInMillis);
}
/**
* If ingest is enabled returns the bulk ingest preprocessing time. in milliseconds, otherwise -1 is returned.
*/
public long getIngestTookInMillis() {
return ingestTookInMillis;
}
/**
* Has anything failed with the execution.
*/
@ -106,6 +128,7 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
responses[i] = BulkItemResponse.readBulkItem(in);
}
tookInMillis = in.readVLong();
ingestTookInMillis = in.readZLong();
}
@Override
@ -116,5 +139,6 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
response.writeTo(out);
}
out.writeVLong(tookInMillis);
out.writeZLong(ingestTookInMillis);
}
}

View File

@ -43,6 +43,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
@ -101,6 +102,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", throwable, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id());
@ -110,8 +112,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
logger.error("failed to execute pipeline for a bulk request", throwable);
listener.onFailure(throwable);
} else {
long ingestTookInMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - ingestStartTimeInNanos, TimeUnit.NANOSECONDS);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
@ -176,11 +179,21 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
}
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return actionListener;
return new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
actionListener.onResponse(new BulkResponse(response.getItems(), response.getTookInMillis(), ingestTookInMillis));
}
@Override
public void onFailure(Throwable e) {
actionListener.onFailure(e);
}
};
} else {
return new IngestBulkResponseListener(originalSlots, itemResponses, actionListener);
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
}
}
@ -197,24 +210,26 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
private final static class IngestBulkResponseListener implements ActionListener<BulkResponse> {
final static class IngestBulkResponseListener implements ActionListener<BulkResponse> {
private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;
IngestBulkResponseListener(int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}
@Override
public void onResponse(BulkResponse bulkItemResponses) {
for (int i = 0; i < bulkItemResponses.getItems().length; i++) {
itemResponses.add(originalSlots[i], bulkItemResponses.getItems()[i]);
public void onResponse(BulkResponse response) {
for (int i = 0; i < response.getItems().length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), bulkItemResponses.getTookInMillis()));
actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), response.getTookInMillis(), ingestTookInMillis));
}
@Override

View File

@ -93,6 +93,9 @@ public class RestBulkAction extends BaseRestHandler {
public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field(Fields.TOOK, response.getTookInMillis());
if (response.getIngestTookInMillis() != BulkResponse.NO_INGEST_TOOK) {
builder.field(Fields.INGEST_TOOK, response.getIngestTookInMillis());
}
builder.field(Fields.ERRORS, response.hasFailures());
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
@ -112,6 +115,7 @@ public class RestBulkAction extends BaseRestHandler {
static final XContentBuilderString ITEMS = new XContentBuilderString("items");
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString INGEST_TOOK = new XContentBuilderString("ingest_took");
}
}

View File

@ -65,10 +65,12 @@ public class BulkRequestModifierTests extends ESTestCase {
assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
// simulate that we actually executed the modified bulk request:
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener);
long ingestTook = randomLong();
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTook, actionListener);
result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
BulkResponse bulkResponse = actionListener.getResponse();
assertThat(bulkResponse.getIngestTookInMillis(), equalTo(ingestTook));
for (int j = 0; j < bulkResponse.getItems().length; j++) {
if (failedSlots.contains(j)) {
BulkItemResponse item = bulkResponse.getItems()[j];
@ -102,7 +104,7 @@ public class BulkRequestModifierTests extends ESTestCase {
assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
List<BulkItemResponse> responses = new ArrayList<>();
ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(new ActionListener<BulkResponse>() {
ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
responses.addAll(Arrays.asList(bulkItemResponses.getItems()));
@ -142,7 +144,7 @@ public class BulkRequestModifierTests extends ESTestCase {
assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
@SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = mock(ActionListener.class);
assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener));
assertThat(modifier.wrapActionListenerIfNeeded(1L, actionListener).getClass().isAnonymousClass(), is(true));
}
private static class CaptureActionListener implements ActionListener<BulkResponse> {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
@ -47,6 +48,7 @@ import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
@ -160,22 +162,7 @@ public class IngestActionFilterTests extends ESTestCase {
when(threadPool.executor(any())).thenReturn(Runnable::run);
PipelineStore store = mock(PipelineStore.class);
Processor processor = new Processor() {
@Override
public void execute(IngestDocument ingestDocument) {
ingestDocument.setFieldValue("field2", "value2");
}
@Override
public String getType() {
return null;
}
@Override
public String getTag() {
return null;
}
};
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool);
IngestService ingestService = mock(IngestService.class);
@ -206,23 +193,20 @@ public class IngestActionFilterTests extends ESTestCase {
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(eq(task), eq(BulkAction.NAME), eq(bulkRequest), any());
verifyZeroInteractions(actionListener);
assertBusy(() -> {
verify(actionFilterChain).proceed(task, BulkAction.NAME, bulkRequest, actionListener);
verifyZeroInteractions(actionListener);
int assertedRequests = 0;
for (ActionRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1"));
assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2"));
}
assertedRequests++;
int assertedRequests = 0;
for (ActionRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1"));
assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2"));
}
assertThat(assertedRequests, equalTo(numRequest));
});
assertedRequests++;
}
assertThat(assertedRequests, equalTo(numRequest));
}
@SuppressWarnings("unchecked")

View File

@ -48,6 +48,7 @@ setup:
_type: test_type
_id: test_id2
- f1: v2
- gte: { ingest_took: 0 }
- do:
get:
@ -85,6 +86,8 @@ setup:
_id: test_id2
pipeline: pipeline2
- f1: v2
- gte: { ingest_took: 0 }
- do:
get:
index: test_index