From 1a4b5bba2bc42fb1be0003cb71371062fedc24ff Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 28 Oct 2015 18:15:01 +0700 Subject: [PATCH] Simplify processor creation from map of maps by folding the build and builder factory in one interface called Factory. In tests processors can be created from the their constructors instead of builders. In the IngestModule, register instances instead of class instances. --- .../org/elasticsearch/ingest/Pipeline.java | 49 ++++--------- .../ingest/processor/Processor.java | 34 +++------- .../processor/geoip/GeoIpProcessor.java | 68 +++++-------------- .../ingest/processor/grok/GrokProcessor.java | 50 +++----------- .../processor/simple/SimpleProcessor.java | 46 ++----------- .../plugin/ingest/IngestModule.java | 21 +++--- .../plugin/ingest/PipelineStore.java | 14 ++-- ...sts.java => GeoProcessorFactoryTests.java} | 26 +++---- .../ingest/PipelineExecutionServiceTests.java | 30 +------- .../plugin/ingest/PipelineStoreTests.java | 2 +- .../transport/IngestActionFilterTests.java | 11 +-- 11 files changed, 94 insertions(+), 257 deletions(-) rename plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/{GeoProcessorBuilderTests.java => GeoProcessorFactoryTests.java} (69%) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index b91ef2f0f7e..f318de721f6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -23,10 +23,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ingest.processor.Processor; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. @@ -37,7 +34,7 @@ public final class Pipeline { private final String description; private final List processors; - private Pipeline(String id, String description, List processors) { + public Pipeline(String id, String description, List processors) { this.id = id; this.description = description; this.processors = processors; @@ -73,47 +70,27 @@ public final class Pipeline { return processors; } - public final static class Builder { + public final static class Factory { - private final String id; - private String description; - private List processors = new ArrayList<>(); - - public Builder(String id) { - this.id = id; - } - - public void fromMap(Map config, Map processorRegistry) throws IOException { - description = (String) config.get("description"); + public Pipeline create(String id, Map config, Map processorRegistry) throws IOException { + String description = (String) config.get("description"); + List processors = new ArrayList<>(); @SuppressWarnings("unchecked") - List>> processors = (List>>) config.get("processors"); - if (processors != null ) { - for (Map> processor : processors) { + List>> processorConfigs = (List>>) config.get("processors"); + if (processorConfigs != null ) { + for (Map> processor : processorConfigs) { for (Map.Entry> entry : processor.entrySet()) { - Processor.Builder builder = processorRegistry.get(entry.getKey()).create(); - if (builder != null) { - builder.fromMap(entry.getValue()); - this.processors.add(builder.build()); + Processor.Factory factory = processorRegistry.get(entry.getKey()); + if (factory != null) { + processors.add(factory.create(entry.getValue())); } else { throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]"); } } } } - } - - public void setDescription(String description) { - this.description = description; - } - - public void addProcessors(Processor.Builder... processors) throws IOException { - for (Processor.Builder processor : processors) { - this.processors.add(processor.build()); - } - } - - public Pipeline build() { return new Pipeline(id, description, Collections.unmodifiableList(processors)); } + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index 6efb3ce06f4..9916e24a587 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -39,40 +39,24 @@ public interface Processor { void execute(Data data); /** - * A builder to construct a processor to be used in a pipeline. + * A factory that knows how to construct a processor based on a map of maps. */ - interface Builder { + interface Factory extends Closeable { /** - * A general way to set processor related settings based on the config map. + * Creates a processor based on the specified map of maps config */ - void fromMap(Map config); + Processor create(Map config) throws IOException; /** - * Builds the processor based on previous set settings. */ - Processor build() throws IOException; - - /** - * A factory that creates a processor builder when processor instances for pipelines are being created. - */ - interface Factory extends Closeable { - - /** - * Creates the builder. - */ - Builder create(); - - /** - */ - default void setConfigDirectory(Path configDirectory) { - } - - @Override - default void close() throws IOException { - } + default void setConfigDirectory(Path configDirectory) { } + @Override + default void close() throws IOException { + } + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java index 103158d4696..482c71c9119 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java @@ -145,47 +145,23 @@ public final class GeoIpProcessor implements Processor { return geoData; } - public static class Builder implements Processor.Builder { + public static class Factory implements Processor.Factory { - private final Path geoIpConfigDirectory; - private final DatabaseReaderService databaseReaderService; + private Path geoIpConfigDirectory; + private final DatabaseReaderService databaseReaderService = new DatabaseReaderService(); - private String ipField; - private String databaseFile = "GeoLite2-City.mmdb"; - private String targetField = "geoip"; - - public Builder(Path geoIpConfigDirectory, DatabaseReaderService databaseReaderService) { - this.geoIpConfigDirectory = geoIpConfigDirectory; - this.databaseReaderService = databaseReaderService; - } - - public void setIpField(String ipField) { - this.ipField = ipField; - } - - public void setDatabaseFile(String dbPath) { - this.databaseFile = dbPath; - } - - public void setTargetField(String targetField) { - this.targetField = targetField; - } - - public void fromMap(Map config) { - this.ipField = (String) config.get("ip_field"); + public Processor create(Map config) throws IOException { + String ipField = (String) config.get("ip_field"); String targetField = (String) config.get("target_field"); - if (targetField != null) { - this.targetField = targetField; + if (targetField == null) { + targetField = "geoip"; } String databaseFile = (String) config.get("database_file"); - if (databaseFile != null) { - this.databaseFile = databaseFile; + if (databaseFile == null) { + databaseFile = "GeoLite2-City.mmdb"; } - } - @Override - public Processor build() throws IOException { Path databasePath = geoIpConfigDirectory.resolve(databaseFile); if (Files.exists(databasePath)) { try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) { @@ -197,27 +173,15 @@ public final class GeoIpProcessor implements Processor { } } - public static class Factory implements Processor.Builder.Factory { - - private Path geoIpConfigDirectory; - private final DatabaseReaderService databaseReaderService = new DatabaseReaderService(); - - @Override - public Processor.Builder create() { - return new Builder(geoIpConfigDirectory, databaseReaderService); - } - - @Override - public void setConfigDirectory(Path configDirectory) { - geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); - } - - @Override - public void close() throws IOException { - databaseReaderService.close(); - } + @Override + public void setConfigDirectory(Path configDirectory) { + geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); } + @Override + public void close() throws IOException { + databaseReaderService.close(); + } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java index 87d6e9b4a11..9c1dcb44da6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java @@ -36,12 +36,10 @@ public final class GrokProcessor implements Processor { public static final String TYPE = "grok"; private final String matchField; - private final String matchPattern; private final Grok grok; - public GrokProcessor(Grok grok, String matchField, String matchPattern) throws IOException { + public GrokProcessor(Grok grok, String matchField) throws IOException { this.matchField = matchField; - this.matchPattern = matchPattern; this.grok = grok; } @@ -57,31 +55,12 @@ public final class GrokProcessor implements Processor { } } - public static class Builder implements Processor.Builder { - + public static class Factory implements Processor.Factory { private Path grokConfigDirectory; - private String matchField; - private String matchPattern; - public Builder(Path grokConfigDirectory) { - this.grokConfigDirectory = grokConfigDirectory; - } - - public void setMatchField(String matchField) { - this.matchField = matchField; - } - - public void setMatchPattern(String matchPattern) { - this.matchPattern = matchPattern; - } - - public void fromMap(Map config) { - this.matchField = (String) config.get("field"); - this.matchPattern = (String) config.get("pattern"); - } - - @Override - public Processor build() throws IOException { + public Processor create(Map config) throws IOException { + String matchField = (String) config.get("field"); + String matchPattern = (String) config.get("pattern"); Map patternBank = new HashMap<>(); Path patternsDirectory = grokConfigDirectory.resolve("patterns"); try (DirectoryStream stream = Files.newDirectoryStream(patternsDirectory)) { @@ -93,22 +72,13 @@ public final class GrokProcessor implements Processor { } Grok grok = new Grok(patternBank, matchPattern); - return new GrokProcessor(grok, matchField, matchPattern); + return new GrokProcessor(grok, matchField); } - public static class Factory implements Processor.Builder.Factory { - private Path grokConfigDirectory; - - @Override - public Processor.Builder create() { - return new Builder(grokConfigDirectory); - } - - @Override - public void setConfigDirectory(Path configDirectory) { - this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok"); - } + @Override + public void setConfigDirectory(Path configDirectory) { + this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok"); } - } + } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/simple/SimpleProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/simple/SimpleProcessor.java index 2892f0e6250..94d8a3bdc0f 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/simple/SimpleProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/simple/SimpleProcessor.java @@ -52,50 +52,16 @@ public final class SimpleProcessor implements Processor { } } - public static class Builder implements Processor.Builder { + public static class Factory implements Processor.Factory { - private String path; - private String expectedValue; - private String addField; - private String addFieldValue; - - public void setPath(String path) { - this.path = path; - } - - public void setExpectedValue(String value) { - this.expectedValue = value; - } - - public void setAddField(String addField) { - this.addField = addField; - } - - public void setAddFieldValue(String addFieldValue) { - this.addFieldValue = addFieldValue; - } - - public void fromMap(Map config) { - this.path = (String) config.get("path"); - this.expectedValue = (String) config.get("expected_value"); - this.addField = (String) config.get("add_field"); - this.addFieldValue = (String) config.get("add_field_value"); - } - - @Override - public Processor build() { + public Processor create(Map config) { + String path = (String) config.get("path"); + String expectedValue = (String) config.get("expected_value"); + String addField = (String) config.get("add_field"); + String addFieldValue = (String) config.get("add_field_value"); return new SimpleProcessor(path, expectedValue, addField, addFieldValue); } - public static class Factory implements Processor.Builder.Factory { - - @Override - public Processor.Builder create() { - return new Builder(); - } - - } - } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index f8903cf5aa0..194716d0f58 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -32,7 +32,7 @@ import java.util.Map; public class IngestModule extends AbstractModule { - private final Map> processors = new HashMap<>(); + private final Map processors = new HashMap<>(); @Override protected void configure() { @@ -41,18 +41,21 @@ public class IngestModule extends AbstractModule { binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(PipelineStoreClient.class).asEagerSingleton(); - registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class); - registerProcessor(GeoIpProcessor.TYPE, GeoIpProcessor.Builder.Factory.class); - registerProcessor(GrokProcessor.TYPE, GrokProcessor.Builder.Factory.class); + addProcessor(SimpleProcessor.TYPE, new SimpleProcessor.Factory()); + addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); + addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); - MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Builder.Factory.class); - for (Map.Entry> entry : processors.entrySet()) { - mapBinder.addBinding(entry.getKey()).to(entry.getValue()); + MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class); + for (Map.Entry entry : processors.entrySet()) { + mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); } } - public void registerProcessor(String processorType, Class processorFactory) { - processors.put(processorType, processorFactory); + /** + * Adds a processor factory under a specific type name. + */ + public void addProcessor(String type, Processor.Factory factory) { + processors.put(type, factory); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index 70cd45bad8d..38e406919c5 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -48,18 +48,19 @@ public class PipelineStore extends AbstractLifecycleComponent { private final ClusterService clusterService; private final TimeValue pipelineUpdateInterval; private final PipelineStoreClient client; - private final Map processorFactoryRegistry; + private final Pipeline.Factory factory = new Pipeline.Factory(); + private final Map processorFactoryRegistry; private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map processors) { + public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map processors) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); this.client = client; - for (Processor.Builder.Factory factory : processors.values()) { + for (Processor.Factory factory : processors.values()) { factory.setConfigDirectory(environment.configFile()); } this.processorFactoryRegistry = Collections.unmodifiableMap(processors); @@ -76,7 +77,7 @@ public class PipelineStore extends AbstractLifecycleComponent { @Override protected void doClose() { - for (Processor.Builder.Factory factory : processorFactoryRegistry.values()) { + for (Processor.Factory factory : processorFactoryRegistry.values()) { try { factory.close(); } catch (IOException e) { @@ -130,9 +131,8 @@ public class PipelineStore extends AbstractLifecycleComponent { } changed++; - Pipeline.Builder builder = new Pipeline.Builder(hit.getId()); - builder.fromMap(hit.sourceAsMap(), processorFactoryRegistry); - newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource)); + Pipeline pipeline = factory.create(hit.getId(), hit.sourceAsMap(), processorFactoryRegistry); + newPipelines.put(pipelineId, new PipelineReference(pipeline, hit.getVersion(), pipelineSource)); } int removed = 0; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorBuilderTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorFactoryTests.java similarity index 69% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorBuilderTests.java rename to plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorFactoryTests.java index 1d026e18df7..f681600db72 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorBuilderTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoProcessorFactoryTests.java @@ -30,36 +30,38 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -public class GeoProcessorBuilderTests extends ESTestCase { +public class GeoProcessorFactoryTests extends ESTestCase { - private Path geoIpConfigDir; + private Path configDir; @Before public void prepareConfigDirectory() throws Exception { - geoIpConfigDir = createTempDir(); + this.configDir = createTempDir(); + Path geoIpConfigDir = configDir.resolve("ingest").resolve("geoip"); + Files.createDirectories(geoIpConfigDir); Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), geoIpConfigDir.resolve("GeoLite2-City.mmdb")); Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb")); } public void testBuild_defaults() throws Exception { - GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService()); - builder.fromMap(Collections.emptyMap()); - GeoIpProcessor processor = (GeoIpProcessor) builder.build(); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); + factory.setConfigDirectory(configDir); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.emptyMap()); assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-City")); } public void testBuild_dbFile() throws Exception { - GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService()); - builder.fromMap(Collections.singletonMap("database_file", "GeoLite2-Country.mmdb")); - GeoIpProcessor processor = (GeoIpProcessor) builder.build(); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); + factory.setConfigDirectory(configDir); + GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.singletonMap("database_file", "GeoLite2-Country.mmdb")); assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); } public void testBuild_nonExistingDbFile() throws Exception { - GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService()); - builder.fromMap(Collections.singletonMap("database_file", "does-not-exist.mmdb")); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); + factory.setConfigDirectory(configDir); try { - builder.build(); + factory.create(Collections.singletonMap("database_file", "does-not-exist.mmdb")); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in")); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 7dfddb3e1cc..f970f24b237 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -28,8 +28,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.Arrays; import java.util.Collections; -import java.util.Map; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -67,20 +67,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecute_success() throws Exception { - Pipeline.Builder builder = new Pipeline.Builder("_id"); Processor processor = mock(Processor.class); - builder.addProcessors(new Processor.Builder() { - @Override - public void fromMap(Map config) { - } - - @Override - public Processor build() { - return processor; - } - }); - - when(store.get("_id")).thenReturn(builder.build()); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); @@ -96,20 +84,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { } public void testExecute_failure() throws Exception { - Pipeline.Builder builder = new Pipeline.Builder("_id"); Processor processor = mock(Processor.class); - builder.addProcessors(new Processor.Builder() { - @Override - public void fromMap(Map config) { - } - - @Override - public Processor build() { - return processor; - } - }); - - when(store.get("_id")).thenReturn(builder.build()); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); doThrow(new RuntimeException()).when(processor).execute(data); PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index fc810fb7247..46271a1c4bb 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -57,7 +57,7 @@ public class PipelineStoreTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); client = mock(PipelineStoreClient.class); Environment environment = mock(Environment.class); - store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory())); + store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Factory())); store.start(); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index f338992d7c0..2f03831c001 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -39,6 +39,8 @@ import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Arrays; + import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -162,14 +164,7 @@ public class IngestActionFilterTests extends ESTestCase { .build() ); PipelineStore store = mock(PipelineStore.class); - Pipeline.Builder pipelineBuilder = new Pipeline.Builder("_id"); - SimpleProcessor.Builder processorBuilder = new SimpleProcessor.Builder(); - processorBuilder.setPath("field1"); - processorBuilder.setExpectedValue("value1"); - processorBuilder.setAddField("field2"); - processorBuilder.setAddFieldValue("value2"); - pipelineBuilder.addProcessors(processorBuilder); - when(store.get("_id")).thenReturn(pipelineBuilder.build()); + when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(new SimpleProcessor("field1", "value1", "field2", "value2")))); executionService = new PipelineExecutionService(store, threadPool); filter = new IngestActionFilter(Settings.EMPTY, executionService);