remove SimpleProcessor

This commit is contained in:
Tal Levy 2015-11-09 19:14:06 -08:00
parent 33035611a5
commit e9b72f5394
7 changed files with 28 additions and 159 deletions

View File

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor.simple;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor;
import java.util.Map;
public final class SimpleProcessor implements Processor {
public static final String TYPE = "simple";
private final String path;
private final String expectedValue;
private final String addField;
private final String addFieldValue;
public SimpleProcessor(String path, String expectedValue, String addField, String addFieldValue) {
this.path = path;
this.expectedValue = expectedValue;
this.addField = addField;
this.addFieldValue = addFieldValue;
}
@Override
public void execute(Data data) {
Object value = data.getProperty(path);
if (value != null) {
if (value.toString().equals(this.expectedValue)) {
data.addField(addField, addFieldValue);
}
}
}
public static class Factory implements Processor.Factory<SimpleProcessor> {
public SimpleProcessor create(Map<String, Object> config) {
String path = ConfigurationUtils.readStringProperty(config, "path");
String expectedValue = ConfigurationUtils.readStringProperty(config, "expected_value");
String addField = ConfigurationUtils.readStringProperty(config, "add_field", null);
String addFieldValue = ConfigurationUtils.readStringProperty(config, "add_field_value");
return new SimpleProcessor(path, expectedValue, addField, addFieldValue);
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.ingest.processor.date.DateProcessor;
import org.elasticsearch.ingest.processor.geoip.GeoIpProcessor;
import org.elasticsearch.ingest.processor.grok.GrokProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import java.util.HashMap;
@ -43,7 +42,6 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton();
addProcessor(SimpleProcessor.TYPE, new SimpleProcessor.Factory());
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());

View File

@ -20,11 +20,10 @@
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -37,43 +36,37 @@ public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("simple", new SimpleProcessor.Factory());
processorRegistry.put("mutate", new MutateProcessor.Factory());
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("path", "_path");
processorConfig.put("expected_value", "_expected_value");
processorConfig.put("add_field", "_add_field");
processorConfig.put("add_field_value", "_add_field_value");
processorConfig.put("uppercase", Arrays.asList("field1"));
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("simple", processorConfig)));
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0), instanceOf(SimpleProcessor.class));
assertThat(pipeline.getProcessors().get(0), instanceOf(MutateProcessor.class));
}
public void testCreate_unusedProcessorOptions() throws Exception {
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("simple", new SimpleProcessor.Factory());
processorRegistry.put("mutate", new MutateProcessor.Factory());
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("path", "_path");
processorConfig.put("expected_value", "_expected_value");
processorConfig.put("add_field", "_add_field");
processorConfig.put("add_field_value", "_add_field_value");
processorConfig.put("uppercase", Arrays.asList("field1"));
processorConfig.put("foo", "bar");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("simple", processorConfig)));
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("processor [simple] doesn't support one or more provided configuration parameters [[foo]]"));
assertThat(e.getMessage(), equalTo("processor [mutate] doesn't support one or more provided configuration parameters [[foo]]"));
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.test.ESTestCase;
@ -32,7 +32,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -57,7 +56,7 @@ public class PipelineStoreTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
client = mock(PipelineStoreClient.class);
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Factory()));
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(MutateProcessor.TYPE, new MutateProcessor.Factory()));
store.start();
}

View File

@ -29,7 +29,8 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.simple.SimpleProcessor;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
import org.elasticsearch.plugin.ingest.PipelineStore;
@ -40,6 +41,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
@ -164,7 +167,14 @@ public class IngestActionFilterTests extends ESTestCase {
.build()
);
PipelineStore store = mock(PipelineStore.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(new SimpleProcessor("field1", "value1", "field2", "value2"))));
Map<String, Object> mutateConfig = new HashMap<>();
Map<String, Object> update = new HashMap<>();
update.put("field2", "value2");
mutateConfig.put("update", update);
Processor mutateProcessor = (new MutateProcessor.Factory()).create(mutateConfig);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(mutateProcessor)));
executionService = new PipelineExecutionService(store, threadPool);
filter = new IngestActionFilter(Settings.EMPTY, executionService);

View File

@ -12,11 +12,10 @@
"description": "_description",
"processors": [
{
"simple" : {
"path" : "field1",
"expected_value" : "_value",
"add_field" : "field2",
"add_field_value" : "_value"
"mutate" : {
"update" : {
"field2": "_value"
}
}
}
]

View File

@ -1,63 +0,0 @@
---
"Test simple processor":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"simple" : {
"path" : "field1",
"expected_value" : "_value",
"add_field" : "field2",
"add_field_value" : "_value"
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field1: "_value"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.field1: "_value" }
- match: { _source.field2: "_value" }
- do:
ingest.bulk:
pipeline_id: "my_pipeline"
body:
- '{ "index": { "_index": "test", "_type": "test", "_id": "2" } }'
- '{ "field1": "_value" }'
- do:
get:
index: test
type: test
id: 2
- match: { _source.field1: "_value" }
- match: { _source.field2: "_value" }