rename the `ingest` parameter to `pipeline_id` param, because it is more descriptive what the parameter should hold.
This commit is contained in:
parent
1a4b5bba2b
commit
92452ff99a
|
@ -50,9 +50,9 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
|||
|
||||
public class IngestPlugin extends Plugin {
|
||||
|
||||
public static final String INGEST_PARAM_CONTEXT_KEY = "__ingest__";
|
||||
public static final String INGEST_PARAM = "ingest";
|
||||
public static final String INGEST_ALREADY_PROCESSED = "ingest_already_processed";
|
||||
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
|
||||
public static final String PIPELINE_ID_PARAM = "pipeline_id";
|
||||
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
|
||||
public static final String NAME = "ingest";
|
||||
|
||||
private final Settings nodeSettings;
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.rest.*;
|
||||
|
||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.*;
|
||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_PARAM_CONTEXT_KEY;
|
||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY;
|
||||
|
||||
public class IngestRestFilter extends RestFilter {
|
||||
|
||||
|
@ -34,8 +34,8 @@ public class IngestRestFilter extends RestFilter {
|
|||
|
||||
@Override
|
||||
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
|
||||
if (request.hasParam(INGEST_PARAM)) {
|
||||
request.putInContext(INGEST_PARAM_CONTEXT_KEY, request.param(INGEST_PARAM));
|
||||
if (request.hasParam(PIPELINE_ID_PARAM)) {
|
||||
request.putInContext(PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(PIPELINE_ID_PARAM));
|
||||
}
|
||||
filterChain.continueProcessing(request, channel);
|
||||
}
|
||||
|
|
|
@ -48,9 +48,9 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
|
||||
@Override
|
||||
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.INGEST_PARAM_CONTEXT_KEY);
|
||||
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
|
||||
if (pipelineId == null) {
|
||||
pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM);
|
||||
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
|
||||
if (pipelineId == null) {
|
||||
chain.proceed(action, request, listener);
|
||||
return;
|
||||
|
@ -76,7 +76,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
// The IndexRequest has the same type on the node that receives the request and the node that
|
||||
// processes the primary action. This could lead to a pipeline being executed twice for the same
|
||||
// index request, hence this check
|
||||
if (indexRequest.hasHeader(IngestPlugin.INGEST_ALREADY_PROCESSED)) {
|
||||
if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) {
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
return;
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
|
|||
if (data.isModified()) {
|
||||
indexRequest.source(data.getDocument());
|
||||
}
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
assertAcked(putMappingResponse);
|
||||
|
||||
client().prepareIndex("test", "type", "1").setSource("field1", "123.42 400 <foo>")
|
||||
.putHeader("ingest", "_id")
|
||||
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
|
||||
.get();
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
|
@ -107,7 +107,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
|
||||
client().prepareBulk().add(
|
||||
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
|
||||
).putHeader("ingest", "_id").get();
|
||||
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -71,7 +71,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
public void testApplyIngestIdViaRequestParam() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
|
@ -84,7 +84,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
public void testApplyIngestIdViaContext() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putInContext(IngestPlugin.INGEST_PARAM_CONTEXT_KEY, "_id");
|
||||
indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
|
@ -97,8 +97,8 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
public void testApplyAlreadyProcessed() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
|
@ -111,7 +111,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
public void testApply_executed() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
|
@ -135,7 +135,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
public void testApply_failed() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
|
@ -169,7 +169,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
filter = new IngestActionFilter(Settings.EMPTY, executionService);
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
int numRequest = scaledRandomIntBetween(8, 64);
|
||||
for (int i = 0; i < numRequest; i++) {
|
||||
if (rarely()) {
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
"type": "list",
|
||||
"description" : "Default comma-separated list of fields to return in the response for updates"
|
||||
},
|
||||
"ingest" : {
|
||||
"pipeline_id" : {
|
||||
"type" : "string",
|
||||
"description" : "The pipeline id to preprocess incoming documents with"
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@
|
|||
"options" : ["internal", "external", "external_gte", "force"],
|
||||
"description" : "Specific version type"
|
||||
},
|
||||
"ingest" : {
|
||||
"pipeline_id" : {
|
||||
"type" : "string",
|
||||
"description" : "The pipeline id to preprocess incoming documents with"
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
ingest: "my_pipeline"
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "_value"}
|
||||
|
||||
- do:
|
||||
|
@ -49,7 +49,7 @@
|
|||
|
||||
- do:
|
||||
ingest.bulk:
|
||||
ingest: "my_pipeline"
|
||||
pipeline_id: "my_pipeline"
|
||||
body:
|
||||
- '{ "index": { "_index": "test", "_type": "test", "_id": "2" } }'
|
||||
- '{ "field1": "_value" }'
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
ingest: "my_pipeline"
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "123.42 400 <foo>"}
|
||||
|
||||
- do:
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
ingest: "my_pipeline"
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "128.101.101.101"}
|
||||
|
||||
- do:
|
||||
|
@ -90,7 +90,7 @@
|
|||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
ingest: "my_pipeline"
|
||||
pipeline_id: "my_pipeline"
|
||||
body: {field1: "128.101.101.101"}
|
||||
|
||||
- do:
|
||||
|
|
Loading…
Reference in New Issue