prevent IndexRequest from being processed multipel times

This commit is contained in:
Martijn van Groningen 2015-10-08 13:27:31 +02:00
parent 82a9ba355d
commit b3ad3f35fa
4 changed files with 29 additions and 5 deletions

View File

@ -36,8 +36,9 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class IngestPlugin extends Plugin {
public static final String INGEST_CONTEXT_KEY = "__ingest__";
public static final String INGEST_PAREM_CONTEXT_KEY = "__ingest__";
public static final String INGEST_PARAM = "ingest";
public static final String INGEST_ALREADY_PROCESSED = "ingest_already_processed";
public static final String NAME = "ingest";
private final Settings nodeSettings;

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.rest.*;
import static org.elasticsearch.plugin.ingest.IngestPlugin.*;
import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_CONTEXT_KEY;
import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_PAREM_CONTEXT_KEY;
public class IngestRestFilter extends RestFilter {
@ -34,7 +34,7 @@ public class IngestRestFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
request.putInContext(INGEST_CONTEXT_KEY, request.param(INGEST_PARAM));
request.putInContext(INGEST_PAREM_CONTEXT_KEY, request.param(INGEST_PARAM));
filterChain.continueProcessing(request, channel);
}
}

View File

@ -48,7 +48,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
@Override
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY);
String pipelineId = request.getFromContext(IngestPlugin.INGEST_PAREM_CONTEXT_KEY);
if (pipelineId == null) {
pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM);
if (pipelineId == null) {
@ -73,6 +73,14 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
}
void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) {
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(IngestPlugin.INGEST_ALREADY_PROCESSED)) {
chain.proceed(action, indexRequest, listener);
return;
}
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() {
@ -81,6 +89,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
if (data.isModified()) {
indexRequest.source(data.getDocument());
}
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
}

View File

@ -82,7 +82,7 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApplyIngestIdViaContext() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putInContext(IngestPlugin.INGEST_CONTEXT_KEY, "_id");
indexRequest.putInContext(IngestPlugin.INGEST_PAREM_CONTEXT_KEY, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -92,6 +92,20 @@ public class IngestActionFilterTests extends ESTestCase {
verifyZeroInteractions(actionFilterChain);
}
public void testApplyAlreadyProcessed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
verifyZeroInteractions(executionService, actionListener);
}
public void testApply_executed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");