Fix some todos
This commit is contained in:
parent
f3883343cb
commit
a7730b05b2
|
@ -30,6 +30,10 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public final class Pipeline {
|
public final class Pipeline {
|
||||||
|
|
||||||
|
final static String DESCRIPTION_KEY = "description";
|
||||||
|
final static String PROCESSORS_KEY = "processors";
|
||||||
|
final static String ON_FAILURE_KEY = "on_failure";
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
private final String description;
|
private final String description;
|
||||||
private final CompoundProcessor compoundProcessor;
|
private final CompoundProcessor compoundProcessor;
|
||||||
|
@ -79,9 +83,9 @@ public final class Pipeline {
|
||||||
public final static class Factory {
|
public final static class Factory {
|
||||||
|
|
||||||
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
|
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
|
||||||
String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants?
|
String description = ConfigurationUtils.readOptionalStringProperty(config, DESCRIPTION_KEY);
|
||||||
List<Processor> processors = readProcessors("processors", processorRegistry, config);
|
List<Processor> processors = readProcessors(PROCESSORS_KEY, processorRegistry, config);
|
||||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config);
|
||||||
if (config.isEmpty() == false) {
|
if (config.isEmpty() == false) {
|
||||||
throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
throw new IllegalArgumentException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
||||||
}
|
}
|
||||||
|
@ -106,7 +110,7 @@ public final class Pipeline {
|
||||||
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
|
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
|
||||||
Processor.Factory factory = processorRegistry.get(type);
|
Processor.Factory factory = processorRegistry.get(type);
|
||||||
if (factory != null) {
|
if (factory != null) {
|
||||||
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
|
List<Processor> onFailureProcessors = readProcessors(ON_FAILURE_KEY, processorRegistry, config);
|
||||||
Processor processor = factory.create(config);
|
Processor processor = factory.create(config);
|
||||||
if (config.isEmpty() == false) {
|
if (config.isEmpty() == false) {
|
||||||
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
|
||||||
|
|
|
@ -57,9 +57,7 @@ public interface ValueSource {
|
||||||
valueSourceList.add(wrap(item, templateService));
|
valueSourceList.add(wrap(item, templateService));
|
||||||
}
|
}
|
||||||
return new ListValue(valueSourceList);
|
return new ListValue(valueSourceList);
|
||||||
} else if (value == null || value instanceof Integer || // TODO(simonw): maybe we just check for Number?
|
} else if (value == null || value instanceof Number || value instanceof Boolean) {
|
||||||
value instanceof Long || value instanceof Float ||
|
|
||||||
value instanceof Double || value instanceof Boolean) {
|
|
||||||
return new ObjectValue(value);
|
return new ObjectValue(value);
|
||||||
} else if (value instanceof String) {
|
} else if (value instanceof String) {
|
||||||
return new TemplatedValue(templateService.compile((String) value));
|
return new TemplatedValue(templateService.compile((String) value));
|
||||||
|
|
|
@ -17,8 +17,9 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest.core;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.TestProcessor;
|
||||||
import org.elasticsearch.ingest.core.Pipeline;
|
import org.elasticsearch.ingest.core.Pipeline;
|
||||||
import org.elasticsearch.ingest.core.Processor;
|
import org.elasticsearch.ingest.core.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -34,8 +35,8 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
public void testCreate() throws Exception {
|
public void testCreate() throws Exception {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
|
@ -48,9 +49,9 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
public void testCreateWithPipelineOnFailure() throws Exception {
|
public void testCreateWithPipelineOnFailure() throws Exception {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
|
@ -66,8 +67,8 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
processorConfig.put("unused", "value");
|
processorConfig.put("unused", "value");
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
try {
|
try {
|
||||||
|
@ -79,11 +80,11 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
|
|
||||||
public void testCreateProcessorsWithOnFailureProperties() throws Exception {
|
public void testCreateProcessorsWithOnFailureProperties() throws Exception {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
processorConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", new HashMap<>())));
|
processorConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", new HashMap<>())));
|
||||||
|
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Loading…
Reference in New Issue