move constants to IngestActionFilter

This commit is contained in:
javanna 2016-01-07 13:40:28 +01:00 committed by Luca Cavanna
parent 2803ae09dc
commit 95bc0ed7a2
6 changed files with 24 additions and 27 deletions

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.tasks.Task;
import java.util.ArrayList;
@ -44,6 +43,10 @@ import java.util.Set;
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
private final PipelineExecutionService executionService;
@Inject
@ -54,9 +57,9 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId == null) {
pipelineId = request.getHeader(ConfigurationUtils.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(PIPELINE_ID_PARAM);
if (pipelineId == null) {
chain.proceed(task, action, request, listener);
return;
@ -84,7 +87,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED)) {
if (indexRequest.hasHeader(PIPELINE_ALREADY_PROCESSED)) {
chain.proceed(task, action, indexRequest, listener);
return;
}
@ -92,7 +95,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
logger.error("failed to execute pipeline [{}]", t, pipelineId);
listener.onFailure(t);
}, success -> {
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(task, action, indexRequest, listener);
});
}

View File

@ -24,17 +24,16 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.ingest.core.ConfigurationUtils;
public final class IngestDisabledActionFilter implements ActionFilter {
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId != null) {
failRequest(pipelineId);
}
pipelineId = request.getHeader(ConfigurationUtils.PIPELINE_ID_PARAM);
pipelineId = request.getHeader(IngestActionFilter.PIPELINE_ID_PARAM);
if (pipelineId != null) {
failRequest(pipelineId);
}

View File

@ -24,10 +24,6 @@ import java.util.Map;
public final class ConfigurationUtils {
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
private ConfigurationUtils() {
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.rest.action.ingest;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestFilter;
@ -36,8 +36,8 @@ public class IngestRestFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
if (request.hasParam(ConfigurationUtils.PIPELINE_ID_PARAM)) {
request.putInContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(ConfigurationUtils.PIPELINE_ID_PARAM));
if (request.hasParam(IngestActionFilter.PIPELINE_ID_PARAM)) {
request.putInContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(IngestActionFilter.PIPELINE_ID_PARAM));
}
filterChain.continueProcessing(request, channel);
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.ingest.IngestBootstrapper;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
@ -89,7 +88,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -103,7 +102,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putInContext(ConfigurationUtils.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
indexRequest.putInContext(IngestActionFilter.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -117,8 +116,8 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ALREADY_PROCESSED, true);
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -132,7 +131,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -154,7 +153,7 @@ public class IngestActionFilterTests extends ESTestCase {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -196,7 +195,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter = new IngestActionFilter(Settings.EMPTY, bootstrapper);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
if (rarely()) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.IngestActionFilter;
import org.elasticsearch.action.ingest.delete.DeletePipelineAction;
import org.elasticsearch.action.ingest.delete.DeletePipelineRequestBuilder;
import org.elasticsearch.action.ingest.get.GetPipelineAction;
@ -37,7 +38,6 @@ import org.elasticsearch.action.ingest.simulate.SimulatePipelineAction;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.action.ingest.simulate.SimulatePipelineResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -151,7 +151,7 @@ public class IngestClientIT extends ESIntegTestCase {
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
bulkRequest.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id");
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
indexRequest.source("field", "value", "fail", i % 2 == 0);
@ -194,7 +194,7 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
client().prepareIndex("test", "type", "1").setSource("field", "value", "fail", false)
.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id")
.putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id")
.get();
Map<String, Object> doc = client().prepareGet("test", "type", "1")
@ -204,7 +204,7 @@ public class IngestClientIT extends ESIntegTestCase {
client().prepareBulk().add(
client().prepareIndex("test", "type", "2").setSource("field", "value2", "fail", false)
).putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id").get();
).putHeader(IngestActionFilter.PIPELINE_ID_PARAM, "_id").get();
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
assertThat(doc.get("field"), equalTo("value2"));
assertThat(doc.get("processed"), equalTo(true));