From 8b1f117e51d92d2add60265e8bb13ba40daa89cb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 20 Nov 2015 14:55:07 +0100 Subject: [PATCH] Instead of failing the entire bulk request if the pipeline fails, only fail a bulk item. --- .../ingest/PipelineExecutionService.java | 6 +- .../ingest/transport/IngestActionFilter.java | 136 ++++++++++++++++-- .../elasticsearch/ingest/IngestClientIT.java | 46 ++++++ .../transport/IngestActionFilterTests.java | 128 ++++++++++++++++- 4 files changed, 296 insertions(+), 20 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index 82e3a403fd1..4a963beecc8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -43,7 +43,7 @@ public class PipelineExecutionService { public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { - listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId))); + listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist")); return; } @@ -53,7 +53,7 @@ public class PipelineExecutionService { try { pipeline.execute(ingestDocument); listener.executed(ingestDocument); - } catch (Exception e) { + } catch (Throwable e) { listener.failed(e); } } @@ -64,7 +64,7 @@ public class PipelineExecutionService { void executed(IngestDocument ingestDocument); - void failed(Exception e); + void failed(Throwable e); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 37099df9b5f..a80d10a18e4 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -22,7 +22,9 @@ package org.elasticsearch.plugin.ingest.transport; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilterChain; @@ -33,10 +35,9 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; -import java.util.Iterator; -import java.util.Map; +import java.util.*; -public class IngestActionFilter extends AbstractComponent implements ActionFilter { +public final class IngestActionFilter extends AbstractComponent implements ActionFilter { private final PipelineExecutionService executionService; @@ -61,7 +62,10 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId); } else if (request instanceof BulkRequest) { BulkRequest bulkRequest = (BulkRequest) request; - processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, bulkRequest.requests().iterator()); + @SuppressWarnings("unchecked") + ActionListener actionListener = (ActionListener) listener; + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, actionListener); } else { chain.proceed(action, request, listener); } @@ -94,22 +98,31 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte } @Override - public void failed(Exception e) { + public void failed(Throwable e) { logger.error("failed to execute pipeline [{}]", e, pipelineId); listener.onFailure(e); } }); } - void processBulkIndexRequest(String action, ActionListener listener, ActionFilterChain chain, BulkRequest bulkRequest, String pipelineId, Iterator requests) { - if (!requests.hasNext()) { - chain.proceed(action, bulkRequest, listener); + void processBulkIndexRequest(BulkRequestModifier bulkRequestModifier, String pipelineId, String action, ActionFilterChain chain, ActionListener listener) { + if (!bulkRequestModifier.hasNext()) { + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + if (bulkRequest.requests().isEmpty()) { + // in this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send a empty response back to the client. + // (this will happen if all preprocessing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + } else { + chain.proceed(action, bulkRequest, actionListener); + } return; } - ActionRequest actionRequest = requests.next(); + ActionRequest actionRequest = bulkRequestModifier.next(); if (!(actionRequest instanceof IndexRequest)) { - processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests); + processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); return; } @@ -122,13 +135,14 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte if (ingestDocument.isModified()) { indexRequest.source(ingestDocument.getSource()); } - processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests); + processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); } @Override - public void failed(Exception e) { - logger.error("failed to execute pipeline [{}]", e, pipelineId); - listener.onFailure(e); + public void failed(Throwable e) { + logger.debug("failed to execute pipeline [{}]", e, pipelineId); + bulkRequestModifier.markCurrentItemAsFailed(e); + processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); } }); } @@ -137,4 +151,98 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte public int order() { return Integer.MAX_VALUE; } + + final static class BulkRequestModifier implements Iterator { + + final BulkRequest bulkRequest; + final Set failedSlots; + final List itemResponses; + + int currentSlot = -1; + int[] originalSlots; + + BulkRequestModifier(BulkRequest bulkRequest) { + this.bulkRequest = bulkRequest; + this.failedSlots = new HashSet<>(); + this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); + } + + @Override + public ActionRequest next() { + return bulkRequest.requests().get(++currentSlot); + } + + @Override + public boolean hasNext() { + return (currentSlot + 1) < bulkRequest.requests().size(); + } + + BulkRequest getBulkRequest() { + if (itemResponses.isEmpty()) { + return bulkRequest; + } else { + BulkRequest modifiedBulkRequest = new BulkRequest(bulkRequest); + modifiedBulkRequest.refresh(bulkRequest.refresh()); + modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel()); + modifiedBulkRequest.timeout(bulkRequest.timeout()); + + int slot = 0; + originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()]; + for (int i = 0; i < bulkRequest.requests().size(); i++) { + ActionRequest request = bulkRequest.requests().get(i); + if (failedSlots.contains(i) == false) { + modifiedBulkRequest.add(request); + originalSlots[slot++] = i; + } + } + return modifiedBulkRequest; + } + } + + ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { + if (itemResponses.isEmpty()) { + return actionListener; + } else { + return new IngestBulkResponseListener(originalSlots, itemResponses, actionListener); + } + } + + void markCurrentItemAsFailed(Throwable e) { + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot); + // We hit a error during preprocessing a request, so we: + // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed + // 2) Add a bulk item failure for this request + // 3) Continue with the next request in the bulk. + failedSlots.add(currentSlot); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); + itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType().lowercase(), failure)); + } + + } + + private final static class IngestBulkResponseListener implements ActionListener { + + private final int[] originalSlots; + private final List itemResponses; + private final ActionListener actionListener; + + IngestBulkResponseListener(int[] originalSlots, List itemResponses, ActionListener actionListener) { + this.itemResponses = itemResponses; + this.actionListener = actionListener; + this.originalSlots = originalSlots; + } + + @Override + public void onResponse(BulkResponse bulkItemResponses) { + for (int i = 0; i < bulkItemResponses.getItems().length; i++) { + itemResponses.add(originalSlots[i], bulkItemResponses.getItems()[i]); + } + actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), bulkItemResponses.getTookInMillis())); + } + + @Override + public void onFailure(Throwable e) { + actionListener.onFailure(e); + } + } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index a14ca5a31cf..6d4b2bdcf64 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -19,7 +19,15 @@ package org.elasticsearch.ingest; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; @@ -39,6 +47,7 @@ import org.elasticsearch.test.ESIntegTestCase; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -111,6 +120,43 @@ public class IngestClientIT extends ESIntegTestCase { assertThat(simulateDocumentSimpleResult.getFailure(), nullValue()); } + public void testBulkWithIngestFailures() { + createIndex("index"); + + int numRequests = scaledRandomIntBetween(32, 128); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_none_existing_id"); + for (int i = 0; i < numRequests; i++) { + if (i % 2 == 0) { + UpdateRequest updateRequest = new UpdateRequest("index", "type", Integer.toString(i)); + updateRequest.upsert("field", "value"); + updateRequest.doc(new HashMap()); + bulkRequest.add(updateRequest); + } else { + IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i)); + indexRequest.source("field1", "value1"); + bulkRequest.add(indexRequest); + } + } + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); + for (int i = 0; i < bulkRequest.requests().size(); i++) { + ActionRequest request = bulkRequest.requests().get(i); + BulkItemResponse itemResponse = response.getItems()[i]; + if (request instanceof IndexRequest) { + BulkItemResponse.Failure failure = itemResponse.getFailure(); + assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: pipeline with id [_none_existing_id] does not exist")); + } else if (request instanceof UpdateRequest) { + UpdateResponse updateResponse = itemResponse.getResponse(); + assertThat(updateResponse.getId(), equalTo(Integer.toString(i))); + assertThat(updateResponse.isCreated(), is(true)); + } else { + fail("unexpected request item [" + request + "]"); + } + } + } + public void test() throws Exception { new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE) .setId("_id") diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index cee6b1b1d78..336c351d497 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -21,7 +21,10 @@ package org.elasticsearch.plugin.ingest.transport; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilterChain; @@ -40,11 +43,12 @@ import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.*; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; @@ -225,4 +229,122 @@ public class IngestActionFilterTests extends ESTestCase { threadPool.shutdown(); } + public void testApplyWithBulkRequestWithFailure() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); + int numRequest = scaledRandomIntBetween(8, 64); + int numNonIndexRequests = 0; + for (int i = 0; i < numRequest; i++) { + if (i % 2 == 0) { + numNonIndexRequests++; + ActionRequest request; + if (randomBoolean()) { + request = new DeleteRequest("_index", "_type", "_id"); + } else { + request = new UpdateRequest("_index", "_type", "_id"); + } + bulkRequest.add(request); + } else { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field1", "value1"); + bulkRequest.add(indexRequest); + } + } + + RuntimeException exception = new RuntimeException(); + Answer answer = (invocationOnMock) -> { + PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; + listener.failed(exception); + return null; + }; + doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + + ActionListener actionListener = mock(ActionListener.class); + RecordRequestAFC actionFilterChain = new RecordRequestAFC(); + + filter.apply("_action", bulkRequest, actionListener, actionFilterChain); + + BulkRequest interceptedRequests = actionFilterChain.getRequest(); + assertThat(interceptedRequests.requests().size(), equalTo(numNonIndexRequests)); + + verifyZeroInteractions(actionListener); + } + + public void testBulkRequestModifier() { + int numRequests = scaledRandomIntBetween(8, 64); + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numRequests; i++) { + bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}")); + } + CaptureActionListener actionListener = new CaptureActionListener(); + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + + int i = 0; + Set failedSlots = new HashSet<>(); + while (bulkRequestModifier.hasNext()) { + IndexRequest indexRequest = (IndexRequest) bulkRequestModifier.next(); + if (randomBoolean()) { + bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); + failedSlots.add(i); + } + i++; + } + + assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size())); + // simulate that we actually executed the modified bulk request: + ActionListener result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener); + result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0)); + + BulkResponse bulkResponse = actionListener.getResponse(); + for (int j = 0; j < bulkResponse.getItems().length; j++) { + if (failedSlots.contains(j)) { + BulkItemResponse item = bulkResponse.getItems()[j]; + assertThat(item.isFailed(), is(true)); + assertThat(item.getFailure().getIndex(), equalTo("_index")); + assertThat(item.getFailure().getType(), equalTo("_type")); + assertThat(item.getFailure().getId(), equalTo(String.valueOf(j))); + assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException")); + } else { + assertThat(bulkResponse.getItems()[j], nullValue()); + } + } + } + + private final static class RecordRequestAFC implements ActionFilterChain { + + private ActionRequest request; + + @Override + public void proceed(String action, ActionRequest request, ActionListener listener) { + this.request = request; + } + + @Override + public void proceed(String action, ActionResponse response, ActionListener listener) { + + } + + public > T getRequest() { + return (T) request; + } + } + + private final static class CaptureActionListener implements ActionListener { + + private BulkResponse response; + + @Override + public void onResponse(BulkResponse bulkItemResponses) { + this.response = bulkItemResponses ; + } + + @Override + public void onFailure(Throwable e) { + } + + public BulkResponse getResponse() { + return response; + } + } + }