diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 2076086a8a9..cd93735b642 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -35,31 +35,53 @@ import java.util.Iterator; */ public class BulkResponse extends ActionResponse implements Iterable { + public final static long NO_INGEST_TOOK = -1L; + private BulkItemResponse[] responses; private long tookInMillis; + private long ingestTookInMillis; BulkResponse() { } public BulkResponse(BulkItemResponse[] responses, long tookInMillis) { + this(responses, tookInMillis, NO_INGEST_TOOK); + } + + public BulkResponse(BulkItemResponse[] responses, long tookInMillis, long ingestTookInMillis) { this.responses = responses; this.tookInMillis = tookInMillis; + this.ingestTookInMillis = ingestTookInMillis; } /** - * How long the bulk execution took. + * How long the bulk execution took. Excluding ingest preprocessing. */ public TimeValue getTook() { return new TimeValue(tookInMillis); } /** - * How long the bulk execution took in milliseconds. + * How long the bulk execution took in milliseconds. Excluding ingest preprocessing. */ public long getTookInMillis() { return tookInMillis; } + /** + * If ingest is enabled returns the bulk ingest preprocessing time, otherwise 0 is returned. + */ + public TimeValue getIngestTook() { + return new TimeValue(ingestTookInMillis); + } + + /** + * If ingest is enabled returns the bulk ingest preprocessing time. in milliseconds, otherwise -1 is returned. + */ + public long getIngestTookInMillis() { + return ingestTookInMillis; + } + /** * Has anything failed with the execution. */ @@ -106,6 +128,7 @@ public class BulkResponse extends ActionResponse implements Iterable listener) { + long ingestStartTimeInNanos = System.nanoTime(); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> { logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", throwable, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()); @@ -110,8 +112,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio logger.error("failed to execute pipeline for a bulk request", throwable); listener.onFailure(throwable); } else { + long ingestTookInMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - ingestStartTimeInNanos, TimeUnit.NANOSECONDS); BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); - ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener); if (bulkRequest.requests().isEmpty()) { // at this stage, the transport bulk action can't deal with a bulk request with no requests, // so we stop and send an empty response back to the client. @@ -176,11 +179,21 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } } - ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { + ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { - return actionListener; + return new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + actionListener.onResponse(new BulkResponse(response.getItems(), response.getTookInMillis(), ingestTookInMillis)); + } + + @Override + public void onFailure(Throwable e) { + actionListener.onFailure(e); + } + }; } else { - return new IngestBulkResponseListener(originalSlots, itemResponses, actionListener); + return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener); } } @@ -197,24 +210,26 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } - private final static class IngestBulkResponseListener implements ActionListener { + final static class IngestBulkResponseListener implements ActionListener { + private final long ingestTookInMillis; private final int[] originalSlots; private final List itemResponses; private final ActionListener actionListener; - IngestBulkResponseListener(int[] originalSlots, List itemResponses, ActionListener actionListener) { + IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List itemResponses, ActionListener actionListener) { + this.ingestTookInMillis = ingestTookInMillis; this.itemResponses = itemResponses; this.actionListener = actionListener; this.originalSlots = originalSlots; } @Override - public void onResponse(BulkResponse bulkItemResponses) { - for (int i = 0; i < bulkItemResponses.getItems().length; i++) { - itemResponses.add(originalSlots[i], bulkItemResponses.getItems()[i]); + public void onResponse(BulkResponse response) { + for (int i = 0; i < response.getItems().length; i++) { + itemResponses.add(originalSlots[i], response.getItems()[i]); } - actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), bulkItemResponses.getTookInMillis())); + actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), response.getTookInMillis(), ingestTookInMillis)); } @Override diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index dbc413fcfc0..4c8d3a85989 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -93,6 +93,9 @@ public class RestBulkAction extends BaseRestHandler { public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception { builder.startObject(); builder.field(Fields.TOOK, response.getTookInMillis()); + if (response.getIngestTookInMillis() != BulkResponse.NO_INGEST_TOOK) { + builder.field(Fields.INGEST_TOOK, response.getIngestTookInMillis()); + } builder.field(Fields.ERRORS, response.hasFailures()); builder.startArray(Fields.ITEMS); for (BulkItemResponse itemResponse : response) { @@ -112,6 +115,7 @@ public class RestBulkAction extends BaseRestHandler { static final XContentBuilderString ITEMS = new XContentBuilderString("items"); static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString INGEST_TOOK = new XContentBuilderString("ingest_took"); } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java index 66c2a0183e8..3286c07e06c 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java @@ -65,10 +65,12 @@ public class BulkRequestModifierTests extends ESTestCase { assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size())); // simulate that we actually executed the modified bulk request: - ActionListener result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener); + long ingestTook = randomLong(); + ActionListener result = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTook, actionListener); result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0)); BulkResponse bulkResponse = actionListener.getResponse(); + assertThat(bulkResponse.getIngestTookInMillis(), equalTo(ingestTook)); for (int j = 0; j < bulkResponse.getItems().length; j++) { if (failedSlots.contains(j)) { BulkItemResponse item = bulkResponse.getItems()[j]; @@ -102,7 +104,7 @@ public class BulkRequestModifierTests extends ESTestCase { assertThat(bulkRequest.requests().size(), Matchers.equalTo(16)); List responses = new ArrayList<>(); - ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(new ActionListener() { + ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener() { @Override public void onResponse(BulkResponse bulkItemResponses) { responses.addAll(Arrays.asList(bulkItemResponses.getItems())); @@ -142,7 +144,7 @@ public class BulkRequestModifierTests extends ESTestCase { assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest)); @SuppressWarnings("unchecked") ActionListener actionListener = mock(ActionListener.class); - assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener)); + assertThat(modifier.wrapActionListenerIfNeeded(1L, actionListener).getClass().isAnonymousClass(), is(true)); } private static class CaptureActionListener implements ActionListener { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index 5d7dd1c0ea6..d4bccafecbf 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.core.CompoundProcessor; import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.Pipeline; @@ -47,6 +48,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -160,22 +162,7 @@ public class IngestActionFilterTests extends ESTestCase { when(threadPool.executor(any())).thenReturn(Runnable::run); PipelineStore store = mock(PipelineStore.class); - Processor processor = new Processor() { - @Override - public void execute(IngestDocument ingestDocument) { - ingestDocument.setFieldValue("field2", "value2"); - } - - @Override - public String getType() { - return null; - } - - @Override - public String getTag() { - return null; - } - }; + Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2")); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor))); executionService = new PipelineExecutionService(store, threadPool); IngestService ingestService = mock(IngestService.class); @@ -206,23 +193,20 @@ public class IngestActionFilterTests extends ESTestCase { ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain); + verify(actionFilterChain).proceed(eq(task), eq(BulkAction.NAME), eq(bulkRequest), any()); + verifyZeroInteractions(actionListener); - assertBusy(() -> { - verify(actionFilterChain).proceed(task, BulkAction.NAME, bulkRequest, actionListener); - verifyZeroInteractions(actionListener); - - int assertedRequests = 0; - for (ActionRequest actionRequest : bulkRequest.requests()) { - if (actionRequest instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) actionRequest; - assertThat(indexRequest.sourceAsMap().size(), equalTo(2)); - assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1")); - assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2")); - } - assertedRequests++; + int assertedRequests = 0; + for (ActionRequest actionRequest : bulkRequest.requests()) { + if (actionRequest instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) actionRequest; + assertThat(indexRequest.sourceAsMap().size(), equalTo(2)); + assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1")); + assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2")); } - assertThat(assertedRequests, equalTo(numRequest)); - }); + assertedRequests++; + } + assertThat(assertedRequests, equalTo(numRequest)); } @SuppressWarnings("unchecked") diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml index b70f05af67e..b363f018667 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml @@ -48,6 +48,7 @@ setup: _type: test_type _id: test_id2 - f1: v2 + - gte: { ingest_took: 0 } - do: get: @@ -85,6 +86,8 @@ setup: _id: test_id2 pipeline: pipeline2 - f1: v2 + - gte: { ingest_took: 0 } + - do: get: index: test_index