Merge branch 'feature/ingest' into ingest/date

This commit is contained in:
javanna 2015-11-06 12:08:14 +01:00 committed by Luca Cavanna
commit 1d2e244bac
21 changed files with 350 additions and 57 deletions

View File

@ -82,7 +82,11 @@ public final class Pipeline {
for (Map.Entry<String, Map<String, Object>> entry : processor.entrySet()) {
Processor.Factory factory = processorRegistry.get(entry.getKey());
if (factory != null) {
processors.add(factory.create(entry.getValue()));
Map<String, Object> processorConfig = entry.getValue();
processors.add(factory.create(processorConfig));
if (processorConfig.isEmpty() == false) {
throw new IllegalArgumentException("processor [" + entry.getKey() + "] doesn't support one or more provided configuration parameters [" + Arrays.toString(processorConfig.keySet().toArray()) + "]");
}
} else {
throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]");
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import java.util.Map;
public final class ConfigurationUtils {
private ConfigurationUtils() {
}
/**
* Returns and removes the specified property from the specified configuration map.
*
* If the property value isn't of type string a {@link IllegalArgumentException} is thrown.
* If the property is missing and no default value has been specified a {@link IllegalArgumentException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.remove(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw new IllegalArgumentException("required property [" + propertyName + "] is missing");
}
if (value instanceof String) {
return (String) value;
} else {
throw new IllegalArgumentException("property [" + propertyName + "] isn't a string, but of type [" + value.getClass() + "]");
}
}
}

View File

@ -27,6 +27,7 @@ import com.maxmind.geoip2.record.*;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.InputStream;
@ -41,18 +42,19 @@ import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor {
public static final String TYPE = "geoip";
private final String ipField;
private final String targetField;
// pck-protected visibility for tests:
final DatabaseReader dbReader;
private final DatabaseReader dbReader;
GeoIpProcessor(String ipField, DatabaseReader dbReader, String targetField) throws IOException {
this.ipField = ipField;
this.targetField = targetField == null ? "geoip" : targetField;
this.targetField = targetField;
this.dbReader = dbReader;
}
@ -80,6 +82,18 @@ public final class GeoIpProcessor implements Processor {
data.addField(targetField, geoData);
}
String getIpField() {
return ipField;
}
String getTargetField() {
return targetField;
}
DatabaseReader getDbReader() {
return dbReader;
}
private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
@ -151,19 +165,12 @@ public final class GeoIpProcessor implements Processor {
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
public Processor create(Map<String, Object> config) throws IOException {
String ipField = (String) config.get("ip_field");
String targetField = (String) config.get("target_field");
if (targetField == null) {
targetField = "geoip";
}
String databaseFile = (String) config.get("database_file");
if (databaseFile == null) {
databaseFile = "GeoLite2-City.mmdb";
}
String ipField = readStringProperty(config, "ip_field", null);
String targetField = readStringProperty(config, "target_field", "geoip");
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
Path databasePath = geoIpConfigDirectory.resolve(databaseFile);
if (Files.exists(databasePath)) {
if (Files.exists(databasePath) && Files.isRegularFile(databasePath)) {
try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
DatabaseReader databaseReader = databaseReaderService.getOrCreateDatabaseReader(databaseFile, database);
return new GeoIpProcessor(ipField, databaseReader, targetField);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor.grok;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
@ -55,18 +56,28 @@ public final class GrokProcessor implements Processor {
}
}
String getMatchField() {
return matchField;
}
Grok getGrok() {
return grok;
}
public static class Factory implements Processor.Factory {
private Path grokConfigDirectory;
public Processor create(Map<String, Object> config) throws IOException {
String matchField = (String) config.get("field");
String matchPattern = (String) config.get("pattern");
String matchField = ConfigurationUtils.readStringProperty(config, "field", null);
String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern", null);
Map<String, String> patternBank = new HashMap<>();
Path patternsDirectory = grokConfigDirectory.resolve("patterns");
try (DirectoryStream<Path> stream = Files.newDirectoryStream(patternsDirectory)) {
for (Path patternFilePath : stream) {
try(InputStream is = Files.newInputStream(patternFilePath, StandardOpenOption.READ)) {
PatternUtils.loadBankFromStream(patternBank, is);
if (Files.isRegularFile(patternFilePath)) {
try(InputStream is = Files.newInputStream(patternFilePath, StandardOpenOption.READ)) {
PatternUtils.loadBankFromStream(patternBank, is);
}
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.processor.simple;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
@ -55,10 +56,10 @@ public final class SimpleProcessor implements Processor {
public static class Factory implements Processor.Factory {
public Processor create(Map<String, Object> config) {
String path = (String) config.get("path");
String expectedValue = (String) config.get("expected_value");
String addField = (String) config.get("add_field");
String addFieldValue = (String) config.get("add_field_value");
String path = ConfigurationUtils.readStringProperty(config, "path", null);
String expectedValue = ConfigurationUtils.readStringProperty(config, "expected_value", null);
String addField = ConfigurationUtils.readStringProperty(config, "add_field", null);
String addFieldValue = ConfigurationUtils.readStringProperty(config, "add_field_value", null);
return new SimpleProcessor(path, expectedValue, addField, addFieldValue);
}

View File

@ -50,9 +50,9 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
public class IngestPlugin extends Plugin {
public static final String INGEST_PARAM_CONTEXT_KEY = "__ingest__";
public static final String INGEST_PARAM = "ingest";
public static final String INGEST_ALREADY_PROCESSED = "ingest_already_processed";
public static final String PIPELINE_ID_PARAM_CONTEXT_KEY = "__pipeline_id__";
public static final String PIPELINE_ID_PARAM = "pipeline_id";
public static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
public static final String NAME = "ingest";
private final Settings nodeSettings;

View File

@ -114,6 +114,10 @@ public class PipelineStore extends AbstractLifecycleComponent {
return result;
}
public Pipeline constructPipeline(String id, Map<String, Object> config) throws IOException {
return factory.create(id, config, processorFactoryRegistry);
}
void updatePipelines() throws IOException {
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
// so for that reason the goal is to keep the update logic simple.
@ -131,7 +135,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
changed++;
Pipeline pipeline = factory.create(hit.getId(), hit.sourceAsMap(), processorFactoryRegistry);
Pipeline pipeline = constructPipeline(hit.getId(), hit.sourceAsMap());
newPipelines.put(pipelineId, new PipelineReference(pipeline, hit.getVersion(), pipelineSource));
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.rest.*;
import static org.elasticsearch.plugin.ingest.IngestPlugin.*;
import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_PARAM_CONTEXT_KEY;
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY;
public class IngestRestFilter extends RestFilter {
@ -34,8 +34,8 @@ public class IngestRestFilter extends RestFilter {
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
if (request.hasParam(INGEST_PARAM)) {
request.putInContext(INGEST_PARAM_CONTEXT_KEY, request.param(INGEST_PARAM));
if (request.hasParam(PIPELINE_ID_PARAM)) {
request.putInContext(PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(PIPELINE_ID_PARAM));
}
filterChain.continueProcessing(request, channel);
}

View File

@ -48,9 +48,9 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
@Override
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
String pipelineId = request.getFromContext(IngestPlugin.INGEST_PARAM_CONTEXT_KEY);
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
if (pipelineId == null) {
pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM);
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
if (pipelineId == null) {
chain.proceed(action, request, listener);
return;
@ -76,7 +76,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(IngestPlugin.INGEST_ALREADY_PROCESSED)) {
if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) {
chain.proceed(action, indexRequest, listener);
return;
}
@ -89,7 +89,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte
if (data.isModified()) {
indexRequest.source(data.getDocument());
}
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
}

View File

@ -28,24 +28,37 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
import java.io.IOException;
import java.util.Map;
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, PutPipelineResponse> {
private final TransportIndexAction indexAction;
private final PipelineStore pipelineStore;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction) {
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction, PipelineStore pipelineStore) {
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.indexAction = indexAction;
this.pipelineStore = pipelineStore;
}
@Override
protected void doExecute(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener) {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
try {
pipelineStore.constructPipeline(request.id(), pipelineConfig);
} catch (IOException e) {
listener.onFailure(e);
return;
}
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(PipelineStore.INDEX);
indexRequest.type(PipelineStore.TYPE);

View File

@ -91,7 +91,7 @@ public class IngestClientIT extends ESIntegTestCase {
assertAcked(putMappingResponse);
client().prepareIndex("test", "type", "1").setSource("field1", "123.42 400 <foo>")
.putHeader("ingest", "_id")
.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id")
.get();
assertBusy(new Runnable() {
@ -107,7 +107,7 @@ public class IngestClientIT extends ESIntegTestCase {
client().prepareBulk().add(
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
).putHeader("ingest", "_id").get();
).putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id").get();
assertBusy(new Runnable() {
@Override
public void run() {

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("simple", new SimpleProcessor.Factory());
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("path", "_path");
processorConfig.put("expected_value", "_expected_value");
processorConfig.put("add_field", "_add_field");
processorConfig.put("add_field_value", "_add_field_value");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("simple", processorConfig)));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0), instanceOf(SimpleProcessor.class));
}
public void testCreate_unusedProcessorOptions() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("simple", new SimpleProcessor.Factory());
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("path", "_path");
processorConfig.put("expected_value", "_expected_value");
processorConfig.put("add_field", "_add_field");
processorConfig.put("add_field_value", "_add_field_value");
processorConfig.put("foo", "bar");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("simple", processorConfig)));
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("processor [simple] doesn't support one or more provided configuration parameters [[foo]]"));
}
}
}

View File

@ -29,6 +29,8 @@ import java.io.ByteArrayInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class GeoProcessorFactoryTests extends ESTestCase {
@ -46,22 +48,48 @@ public class GeoProcessorFactoryTests extends ESTestCase {
public void testBuild_defaults() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.emptyMap());
assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));
}
public void testBuild_targetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("target_field", "_field");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
}
public void testBuild_dbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.singletonMap("database_file", "GeoLite2-Country.mmdb"));
assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
}
public void testBuild_nonExistingDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("database_file", "does-not-exist.mmdb");
try {
factory.create(Collections.singletonMap("database_file", "does-not-exist.mmdb"));
factory.create(config);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in"));
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.grok;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class GrokProcessorFactoryTests extends ESTestCase {
private Path configDir;
@Before
public void prepareConfigDirectory() throws Exception {
this.configDir = createTempDir();
Path grokDir = configDir.resolve("ingest").resolve("grok");
Path patternsDir = grokDir.resolve("patterns");
Files.createDirectories(patternsDir);
}
public void testBuild() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("pattern", "(?<foo>\\w+)");
GrokProcessor processor = (GrokProcessor) factory.create(config);
assertThat(processor.getMatchField(), equalTo("_field"));
assertThat(processor.getGrok(), notNullValue());
}
}

View File

@ -71,7 +71,7 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApplyIngestIdViaRequestParam() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -84,7 +84,7 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApplyIngestIdViaContext() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putInContext(IngestPlugin.INGEST_PARAM_CONTEXT_KEY, "_id");
indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -97,8 +97,8 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApplyAlreadyProcessed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true);
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -111,7 +111,7 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApply_executed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -135,7 +135,7 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApply_failed() throws Exception {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -169,7 +169,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter = new IngestActionFilter(Settings.EMPTY, executionService);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
if (rarely()) {

View File

@ -41,7 +41,7 @@
"type": "list",
"description" : "Default comma-separated list of fields to return in the response for updates"
},
"ingest" : {
"pipeline_id" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}

View File

@ -66,7 +66,7 @@
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"ingest" : {
"pipeline_id" : {
"type" : "string",
"description" : "The pipeline id to preprocess incoming documents with"
}

View File

@ -55,3 +55,40 @@
catch: missing
ingest.get_pipeline:
ids: "my_pipeline"
---
"Test invalid config":
- do:
cluster.health:
wait_for_status: green
- do:
catch: param
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
}
}
]
}
- do:
catch: param
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"ip_field" : 1234
}
}
]
}

View File

@ -36,7 +36,7 @@
index: test
type: test
id: 1
ingest: "my_pipeline"
pipeline_id: "my_pipeline"
body: {field1: "_value"}
- do:
@ -49,7 +49,7 @@
- do:
ingest.bulk:
ingest: "my_pipeline"
pipeline_id: "my_pipeline"
body:
- '{ "index": { "_index": "test", "_type": "test", "_id": "2" } }'
- '{ "field1": "_value" }'

View File

@ -34,7 +34,7 @@
index: test
type: test
id: 1
ingest: "my_pipeline"
pipeline_id: "my_pipeline"
body: {field1: "123.42 400 <foo>"}
- do:

View File

@ -33,7 +33,7 @@
index: test
type: test
id: 1
ingest: "my_pipeline"
pipeline_id: "my_pipeline"
body: {field1: "128.101.101.101"}
- do:
@ -90,7 +90,7 @@
index: test
type: test
id: 1
ingest: "my_pipeline"
pipeline_id: "my_pipeline"
body: {field1: "128.101.101.101"}
- do: