Instead of failing the entire bulk request if the pipeline fails, only fail a bulk item.
This commit is contained in:
parent
eeb51ce8d0
commit
8b1f117e51
|
@ -43,7 +43,7 @@ public class PipelineExecutionService {
|
|||
public void execute(IngestDocument ingestDocument, String pipelineId, Listener listener) {
|
||||
Pipeline pipeline = store.get(pipelineId);
|
||||
if (pipeline == null) {
|
||||
listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId)));
|
||||
listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ public class PipelineExecutionService {
|
|||
try {
|
||||
pipeline.execute(ingestDocument);
|
||||
listener.executed(ingestDocument);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
listener.failed(e);
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ public class PipelineExecutionService {
|
|||
|
||||
void executed(IngestDocument ingestDocument);
|
||||
|
||||
void failed(Exception e);
|
||||
void failed(Throwable e);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,9 @@ package org.elasticsearch.plugin.ingest.transport;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
|
@ -33,10 +35,9 @@ import org.elasticsearch.ingest.IngestDocument;
|
|||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
public class IngestActionFilter extends AbstractComponent implements ActionFilter {
|
||||
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
|
||||
|
||||
private final PipelineExecutionService executionService;
|
||||
|
||||
|
@ -61,7 +62,10 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId);
|
||||
} else if (request instanceof BulkRequest) {
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, bulkRequest.requests().iterator());
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
|
||||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
|
||||
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, actionListener);
|
||||
} else {
|
||||
chain.proceed(action, request, listener);
|
||||
}
|
||||
|
@ -94,22 +98,31 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception e) {
|
||||
public void failed(Throwable e) {
|
||||
logger.error("failed to execute pipeline [{}]", e, pipelineId);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void processBulkIndexRequest(String action, ActionListener listener, ActionFilterChain chain, BulkRequest bulkRequest, String pipelineId, Iterator<ActionRequest> requests) {
|
||||
if (!requests.hasNext()) {
|
||||
chain.proceed(action, bulkRequest, listener);
|
||||
void processBulkIndexRequest(BulkRequestModifier bulkRequestModifier, String pipelineId, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
||||
if (!bulkRequestModifier.hasNext()) {
|
||||
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
|
||||
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
|
||||
if (bulkRequest.requests().isEmpty()) {
|
||||
// in this stage, the transport bulk action can't deal with a bulk request with no requests,
|
||||
// so we stop and send a empty response back to the client.
|
||||
// (this will happen if all preprocessing all items in the bulk failed)
|
||||
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
|
||||
} else {
|
||||
chain.proceed(action, bulkRequest, actionListener);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
ActionRequest actionRequest = requests.next();
|
||||
ActionRequest actionRequest = bulkRequestModifier.next();
|
||||
if (!(actionRequest instanceof IndexRequest)) {
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests);
|
||||
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -122,13 +135,14 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
if (ingestDocument.isModified()) {
|
||||
indexRequest.source(ingestDocument.getSource());
|
||||
}
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests);
|
||||
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception e) {
|
||||
logger.error("failed to execute pipeline [{}]", e, pipelineId);
|
||||
listener.onFailure(e);
|
||||
public void failed(Throwable e) {
|
||||
logger.debug("failed to execute pipeline [{}]", e, pipelineId);
|
||||
bulkRequestModifier.markCurrentItemAsFailed(e);
|
||||
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -137,4 +151,98 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
public int order() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
final static class BulkRequestModifier implements Iterator<ActionRequest> {
|
||||
|
||||
final BulkRequest bulkRequest;
|
||||
final Set<Integer> failedSlots;
|
||||
final List<BulkItemResponse> itemResponses;
|
||||
|
||||
int currentSlot = -1;
|
||||
int[] originalSlots;
|
||||
|
||||
BulkRequestModifier(BulkRequest bulkRequest) {
|
||||
this.bulkRequest = bulkRequest;
|
||||
this.failedSlots = new HashSet<>();
|
||||
this.itemResponses = new ArrayList<>(bulkRequest.requests().size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequest next() {
|
||||
return bulkRequest.requests().get(++currentSlot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (currentSlot + 1) < bulkRequest.requests().size();
|
||||
}
|
||||
|
||||
BulkRequest getBulkRequest() {
|
||||
if (itemResponses.isEmpty()) {
|
||||
return bulkRequest;
|
||||
} else {
|
||||
BulkRequest modifiedBulkRequest = new BulkRequest(bulkRequest);
|
||||
modifiedBulkRequest.refresh(bulkRequest.refresh());
|
||||
modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
|
||||
modifiedBulkRequest.timeout(bulkRequest.timeout());
|
||||
|
||||
int slot = 0;
|
||||
originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()];
|
||||
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests().get(i);
|
||||
if (failedSlots.contains(i) == false) {
|
||||
modifiedBulkRequest.add(request);
|
||||
originalSlots[slot++] = i;
|
||||
}
|
||||
}
|
||||
return modifiedBulkRequest;
|
||||
}
|
||||
}
|
||||
|
||||
ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
|
||||
if (itemResponses.isEmpty()) {
|
||||
return actionListener;
|
||||
} else {
|
||||
return new IngestBulkResponseListener(originalSlots, itemResponses, actionListener);
|
||||
}
|
||||
}
|
||||
|
||||
void markCurrentItemAsFailed(Throwable e) {
|
||||
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
|
||||
// We hit a error during preprocessing a request, so we:
|
||||
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
|
||||
// 2) Add a bulk item failure for this request
|
||||
// 3) Continue with the next request in the bulk.
|
||||
failedSlots.add(currentSlot);
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
|
||||
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType().lowercase(), failure));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private final static class IngestBulkResponseListener implements ActionListener<BulkResponse> {
|
||||
|
||||
private final int[] originalSlots;
|
||||
private final List<BulkItemResponse> itemResponses;
|
||||
private final ActionListener<BulkResponse> actionListener;
|
||||
|
||||
IngestBulkResponseListener(int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
|
||||
this.itemResponses = itemResponses;
|
||||
this.actionListener = actionListener;
|
||||
this.originalSlots = originalSlots;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
for (int i = 0; i < bulkItemResponses.getItems().length; i++) {
|
||||
itemResponses.add(originalSlots[i], bulkItemResponses.getItems()[i]);
|
||||
}
|
||||
actionListener.onResponse(new BulkResponse(itemResponses.toArray(new BulkItemResponse[itemResponses.size()]), bulkItemResponses.getTookInMillis()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
actionListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,15 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
|
||||
|
@ -39,6 +47,7 @@ import org.elasticsearch.test.ESIntegTestCase;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
@ -111,6 +120,43 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
|
||||
}
|
||||
|
||||
public void testBulkWithIngestFailures() {
|
||||
createIndex("index");
|
||||
|
||||
int numRequests = scaledRandomIntBetween(32, 128);
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_none_existing_id");
|
||||
for (int i = 0; i < numRequests; i++) {
|
||||
if (i % 2 == 0) {
|
||||
UpdateRequest updateRequest = new UpdateRequest("index", "type", Integer.toString(i));
|
||||
updateRequest.upsert("field", "value");
|
||||
updateRequest.doc(new HashMap());
|
||||
bulkRequest.add(updateRequest);
|
||||
} else {
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
|
||||
indexRequest.source("field1", "value1");
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
}
|
||||
|
||||
BulkResponse response = client().bulk(bulkRequest).actionGet();
|
||||
assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));
|
||||
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests().get(i);
|
||||
BulkItemResponse itemResponse = response.getItems()[i];
|
||||
if (request instanceof IndexRequest) {
|
||||
BulkItemResponse.Failure failure = itemResponse.getFailure();
|
||||
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: pipeline with id [_none_existing_id] does not exist"));
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
UpdateResponse updateResponse = itemResponse.getResponse();
|
||||
assertThat(updateResponse.getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(updateResponse.isCreated(), is(true));
|
||||
} else {
|
||||
fail("unexpected request item [" + request + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void test() throws Exception {
|
||||
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
|
||||
.setId("_id")
|
||||
|
|
|
@ -21,7 +21,10 @@ package org.elasticsearch.plugin.ingest.transport;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
|
@ -40,11 +43,12 @@ import org.junit.Before;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.*;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -225,4 +229,122 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
public void testApplyWithBulkRequestWithFailure() throws Exception {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
int numRequest = scaledRandomIntBetween(8, 64);
|
||||
int numNonIndexRequests = 0;
|
||||
for (int i = 0; i < numRequest; i++) {
|
||||
if (i % 2 == 0) {
|
||||
numNonIndexRequests++;
|
||||
ActionRequest request;
|
||||
if (randomBoolean()) {
|
||||
request = new DeleteRequest("_index", "_type", "_id");
|
||||
} else {
|
||||
request = new UpdateRequest("_index", "_type", "_id");
|
||||
}
|
||||
bulkRequest.add(request);
|
||||
} else {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field1", "value1");
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
}
|
||||
|
||||
RuntimeException exception = new RuntimeException();
|
||||
Answer answer = (invocationOnMock) -> {
|
||||
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
|
||||
listener.failed(exception);
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IngestDocument.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
RecordRequestAFC actionFilterChain = new RecordRequestAFC();
|
||||
|
||||
filter.apply("_action", bulkRequest, actionListener, actionFilterChain);
|
||||
|
||||
BulkRequest interceptedRequests = actionFilterChain.getRequest();
|
||||
assertThat(interceptedRequests.requests().size(), equalTo(numNonIndexRequests));
|
||||
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
public void testBulkRequestModifier() {
|
||||
int numRequests = scaledRandomIntBetween(8, 64);
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (int i = 0; i < numRequests; i++) {
|
||||
bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
|
||||
}
|
||||
CaptureActionListener actionListener = new CaptureActionListener();
|
||||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
|
||||
|
||||
int i = 0;
|
||||
Set<Integer> failedSlots = new HashSet<>();
|
||||
while (bulkRequestModifier.hasNext()) {
|
||||
IndexRequest indexRequest = (IndexRequest) bulkRequestModifier.next();
|
||||
if (randomBoolean()) {
|
||||
bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
|
||||
failedSlots.add(i);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
|
||||
// simulate that we actually executed the modified bulk request:
|
||||
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener);
|
||||
result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
|
||||
|
||||
BulkResponse bulkResponse = actionListener.getResponse();
|
||||
for (int j = 0; j < bulkResponse.getItems().length; j++) {
|
||||
if (failedSlots.contains(j)) {
|
||||
BulkItemResponse item = bulkResponse.getItems()[j];
|
||||
assertThat(item.isFailed(), is(true));
|
||||
assertThat(item.getFailure().getIndex(), equalTo("_index"));
|
||||
assertThat(item.getFailure().getType(), equalTo("_type"));
|
||||
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
|
||||
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
|
||||
} else {
|
||||
assertThat(bulkResponse.getItems()[j], nullValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final static class RecordRequestAFC implements ActionFilterChain {
|
||||
|
||||
private ActionRequest request;
|
||||
|
||||
@Override
|
||||
public void proceed(String action, ActionRequest request, ActionListener listener) {
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proceed(String action, ActionResponse response, ActionListener listener) {
|
||||
|
||||
}
|
||||
|
||||
public <T extends ActionRequest<T>> T getRequest() {
|
||||
return (T) request;
|
||||
}
|
||||
}
|
||||
|
||||
private final static class CaptureActionListener implements ActionListener<BulkResponse> {
|
||||
|
||||
private BulkResponse response;
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkItemResponses) {
|
||||
this.response = bulkItemResponses ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
}
|
||||
|
||||
public BulkResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue