Simplify processor creation from map of maps by folding the build and builder factory in one interface called Factory.

In tests processors can be created from the their constructors instead of builders.
In the IngestModule, register instances instead of class instances.
This commit is contained in:
Martijn van Groningen 2015-10-28 18:15:01 +07:00
parent 20d4253df5
commit 1a4b5bba2b
11 changed files with 94 additions and 257 deletions

View File

@ -23,10 +23,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
@ -37,7 +34,7 @@ public final class Pipeline {
private final String description;
private final List<Processor> processors;
private Pipeline(String id, String description, List<Processor> processors) {
public Pipeline(String id, String description, List<Processor> processors) {
this.id = id;
this.description = description;
this.processors = processors;
@ -73,47 +70,27 @@ public final class Pipeline {
return processors;
}
public final static class Builder {
public final static class Factory {
private final String id;
private String description;
private List<Processor> processors = new ArrayList<>();
public Builder(String id) {
this.id = id;
}
public void fromMap(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) throws IOException {
description = (String) config.get("description");
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException {
String description = (String) config.get("description");
List<Processor> processors = new ArrayList<>();
@SuppressWarnings("unchecked")
List<Map<String, Map<String, Object>>> processors = (List<Map<String, Map<String, Object>>>) config.get("processors");
if (processors != null ) {
for (Map<String, Map<String, Object>> processor : processors) {
List<Map<String, Map<String, Object>>> processorConfigs = (List<Map<String, Map<String, Object>>>) config.get("processors");
if (processorConfigs != null ) {
for (Map<String, Map<String, Object>> processor : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processor.entrySet()) {
Processor.Builder builder = processorRegistry.get(entry.getKey()).create();
if (builder != null) {
builder.fromMap(entry.getValue());
this.processors.add(builder.build());
Processor.Factory factory = processorRegistry.get(entry.getKey());
if (factory != null) {
processors.add(factory.create(entry.getValue()));
} else {
throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]");
}
}
}
}
}
public void setDescription(String description) {
this.description = description;
}
public void addProcessors(Processor.Builder... processors) throws IOException {
for (Processor.Builder processor : processors) {
this.processors.add(processor.build());
}
}
public Pipeline build() {
return new Pipeline(id, description, Collections.unmodifiableList(processors));
}
}
}

View File

@ -39,40 +39,24 @@ public interface Processor {
void execute(Data data);
/**
* A builder to construct a processor to be used in a pipeline.
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Builder {
interface Factory extends Closeable {
/**
* A general way to set processor related settings based on the config map.
* Creates a processor based on the specified map of maps config
*/
void fromMap(Map<String, Object> config);
Processor create(Map<String, Object> config) throws IOException;
/**
* Builds the processor based on previous set settings.
*/
Processor build() throws IOException;
/**
* A factory that creates a processor builder when processor instances for pipelines are being created.
*/
interface Factory extends Closeable {
/**
* Creates the builder.
*/
Builder create();
/**
*/
default void setConfigDirectory(Path configDirectory) {
}
@Override
default void close() throws IOException {
}
default void setConfigDirectory(Path configDirectory) {
}
@Override
default void close() throws IOException {
}
}
}

View File

@ -145,47 +145,23 @@ public final class GeoIpProcessor implements Processor {
return geoData;
}
public static class Builder implements Processor.Builder {
public static class Factory implements Processor.Factory {
private final Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService;
private Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
private String ipField;
private String databaseFile = "GeoLite2-City.mmdb";
private String targetField = "geoip";
public Builder(Path geoIpConfigDirectory, DatabaseReaderService databaseReaderService) {
this.geoIpConfigDirectory = geoIpConfigDirectory;
this.databaseReaderService = databaseReaderService;
}
public void setIpField(String ipField) {
this.ipField = ipField;
}
public void setDatabaseFile(String dbPath) {
this.databaseFile = dbPath;
}
public void setTargetField(String targetField) {
this.targetField = targetField;
}
public void fromMap(Map<String, Object> config) {
this.ipField = (String) config.get("ip_field");
public Processor create(Map<String, Object> config) throws IOException {
String ipField = (String) config.get("ip_field");
String targetField = (String) config.get("target_field");
if (targetField != null) {
this.targetField = targetField;
if (targetField == null) {
targetField = "geoip";
}
String databaseFile = (String) config.get("database_file");
if (databaseFile != null) {
this.databaseFile = databaseFile;
if (databaseFile == null) {
databaseFile = "GeoLite2-City.mmdb";
}
}
@Override
public Processor build() throws IOException {
Path databasePath = geoIpConfigDirectory.resolve(databaseFile);
if (Files.exists(databasePath)) {
try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
@ -197,27 +173,15 @@ public final class GeoIpProcessor implements Processor {
}
}
public static class Factory implements Processor.Builder.Factory {
private Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
@Override
public Processor.Builder create() {
return new Builder(geoIpConfigDirectory, databaseReaderService);
}
@Override
public void setConfigDirectory(Path configDirectory) {
geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
}
@Override
public void close() throws IOException {
databaseReaderService.close();
}
@Override
public void setConfigDirectory(Path configDirectory) {
geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
}
@Override
public void close() throws IOException {
databaseReaderService.close();
}
}
}

View File

@ -36,12 +36,10 @@ public final class GrokProcessor implements Processor {
public static final String TYPE = "grok";
private final String matchField;
private final String matchPattern;
private final Grok grok;
public GrokProcessor(Grok grok, String matchField, String matchPattern) throws IOException {
public GrokProcessor(Grok grok, String matchField) throws IOException {
this.matchField = matchField;
this.matchPattern = matchPattern;
this.grok = grok;
}
@ -57,31 +55,12 @@ public final class GrokProcessor implements Processor {
}
}
public static class Builder implements Processor.Builder {
public static class Factory implements Processor.Factory {
private Path grokConfigDirectory;
private String matchField;
private String matchPattern;
public Builder(Path grokConfigDirectory) {
this.grokConfigDirectory = grokConfigDirectory;
}
public void setMatchField(String matchField) {
this.matchField = matchField;
}
public void setMatchPattern(String matchPattern) {
this.matchPattern = matchPattern;
}
public void fromMap(Map<String, Object> config) {
this.matchField = (String) config.get("field");
this.matchPattern = (String) config.get("pattern");
}
@Override
public Processor build() throws IOException {
public Processor create(Map<String, Object> config) throws IOException {
String matchField = (String) config.get("field");
String matchPattern = (String) config.get("pattern");
Map<String, String> patternBank = new HashMap<>();
Path patternsDirectory = grokConfigDirectory.resolve("patterns");
try (DirectoryStream<Path> stream = Files.newDirectoryStream(patternsDirectory)) {
@ -93,22 +72,13 @@ public final class GrokProcessor implements Processor {
}
Grok grok = new Grok(patternBank, matchPattern);
return new GrokProcessor(grok, matchField, matchPattern);
return new GrokProcessor(grok, matchField);
}
public static class Factory implements Processor.Builder.Factory {
private Path grokConfigDirectory;
@Override
public Processor.Builder create() {
return new Builder(grokConfigDirectory);
}
@Override
public void setConfigDirectory(Path configDirectory) {
this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
}
@Override
public void setConfigDirectory(Path configDirectory) {
this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
}
}
}

View File

@ -52,50 +52,16 @@ public final class SimpleProcessor implements Processor {
}
}
public static class Builder implements Processor.Builder {
public static class Factory implements Processor.Factory {
private String path;
private String expectedValue;
private String addField;
private String addFieldValue;
public void setPath(String path) {
this.path = path;
}
public void setExpectedValue(String value) {
this.expectedValue = value;
}
public void setAddField(String addField) {
this.addField = addField;
}
public void setAddFieldValue(String addFieldValue) {
this.addFieldValue = addFieldValue;
}
public void fromMap(Map<String, Object> config) {
this.path = (String) config.get("path");
this.expectedValue = (String) config.get("expected_value");
this.addField = (String) config.get("add_field");
this.addFieldValue = (String) config.get("add_field_value");
}
@Override
public Processor build() {
public Processor create(Map<String, Object> config) {
String path = (String) config.get("path");
String expectedValue = (String) config.get("expected_value");
String addField = (String) config.get("add_field");
String addFieldValue = (String) config.get("add_field_value");
return new SimpleProcessor(path, expectedValue, addField, addFieldValue);
}
public static class Factory implements Processor.Builder.Factory {
@Override
public Processor.Builder create() {
return new Builder();
}
}
}
}

View File

@ -32,7 +32,7 @@ import java.util.Map;
public class IngestModule extends AbstractModule {
private final Map<String, Class<? extends Processor.Builder.Factory>> processors = new HashMap<>();
private final Map<String, Processor.Factory> processors = new HashMap<>();
@Override
protected void configure() {
@ -41,18 +41,21 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton();
registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class);
registerProcessor(GeoIpProcessor.TYPE, GeoIpProcessor.Builder.Factory.class);
registerProcessor(GrokProcessor.TYPE, GrokProcessor.Builder.Factory.class);
addProcessor(SimpleProcessor.TYPE, new SimpleProcessor.Factory());
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
MapBinder<String, Processor.Builder.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Builder.Factory.class);
for (Map.Entry<String, Class<? extends Processor.Builder.Factory>> entry : processors.entrySet()) {
mapBinder.addBinding(entry.getKey()).to(entry.getValue());
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
}
}
public void registerProcessor(String processorType, Class<? extends Processor.Builder.Factory> processorFactory) {
processors.put(processorType, processorFactory);
/**
* Adds a processor factory under a specific type name.
*/
public void addProcessor(String type, Processor.Factory factory) {
processors.put(type, factory);
}
}

View File

@ -48,18 +48,19 @@ public class PipelineStore extends AbstractLifecycleComponent {
private final ClusterService clusterService;
private final TimeValue pipelineUpdateInterval;
private final PipelineStoreClient client;
private final Map<String, Processor.Builder.Factory> processorFactoryRegistry;
private final Pipeline.Factory factory = new Pipeline.Factory();
private final Map<String, Processor.Factory> processorFactoryRegistry;
private volatile Map<String, PipelineReference> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map<String, Processor.Builder.Factory> processors) {
public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map<String, Processor.Factory> processors) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
this.client = client;
for (Processor.Builder.Factory factory : processors.values()) {
for (Processor.Factory factory : processors.values()) {
factory.setConfigDirectory(environment.configFile());
}
this.processorFactoryRegistry = Collections.unmodifiableMap(processors);
@ -76,7 +77,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
@Override
protected void doClose() {
for (Processor.Builder.Factory factory : processorFactoryRegistry.values()) {
for (Processor.Factory factory : processorFactoryRegistry.values()) {
try {
factory.close();
} catch (IOException e) {
@ -130,9 +131,8 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
changed++;
Pipeline.Builder builder = new Pipeline.Builder(hit.getId());
builder.fromMap(hit.sourceAsMap(), processorFactoryRegistry);
newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource));
Pipeline pipeline = factory.create(hit.getId(), hit.sourceAsMap(), processorFactoryRegistry);
newPipelines.put(pipelineId, new PipelineReference(pipeline, hit.getVersion(), pipelineSource));
}
int removed = 0;

View File

@ -30,36 +30,38 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
public class GeoProcessorBuilderTests extends ESTestCase {
public class GeoProcessorFactoryTests extends ESTestCase {
private Path geoIpConfigDir;
private Path configDir;
@Before
public void prepareConfigDirectory() throws Exception {
geoIpConfigDir = createTempDir();
this.configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest").resolve("geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
}
public void testBuild_defaults() throws Exception {
GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService());
builder.fromMap(Collections.emptyMap());
GeoIpProcessor processor = (GeoIpProcessor) builder.build();
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.emptyMap());
assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));
}
public void testBuild_dbFile() throws Exception {
GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService());
builder.fromMap(Collections.singletonMap("database_file", "GeoLite2-Country.mmdb"));
GeoIpProcessor processor = (GeoIpProcessor) builder.build();
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor processor = (GeoIpProcessor) factory.create(Collections.singletonMap("database_file", "GeoLite2-Country.mmdb"));
assertThat(processor.dbReader.getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
}
public void testBuild_nonExistingDbFile() throws Exception {
GeoIpProcessor.Builder builder = new GeoIpProcessor.Builder(geoIpConfigDir, new DatabaseReaderService());
builder.fromMap(Collections.singletonMap("database_file", "does-not-exist.mmdb"));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
try {
builder.build();
factory.create(Collections.singletonMap("database_file", "does-not-exist.mmdb"));
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in"));
}

View File

@ -28,8 +28,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
@ -67,20 +67,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
public void testExecute_success() throws Exception {
Pipeline.Builder builder = new Pipeline.Builder("_id");
Processor processor = mock(Processor.class);
builder.addProcessors(new Processor.Builder() {
@Override
public void fromMap(Map<String, Object> config) {
}
@Override
public Processor build() {
return processor;
}
});
when(store.get("_id")).thenReturn(builder.build());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
@ -96,20 +84,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
public void testExecute_failure() throws Exception {
Pipeline.Builder builder = new Pipeline.Builder("_id");
Processor processor = mock(Processor.class);
builder.addProcessors(new Processor.Builder() {
@Override
public void fromMap(Map<String, Object> config) {
}
@Override
public Processor build() {
return processor;
}
});
when(store.get("_id")).thenReturn(builder.build());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(data);
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);

View File

@ -57,7 +57,7 @@ public class PipelineStoreTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
client = mock(PipelineStoreClient.class);
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Builder.Factory()));
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.singletonMap(SimpleProcessor.TYPE, new SimpleProcessor.Factory()));
store.start();
}

View File

@ -39,6 +39,8 @@ import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -162,14 +164,7 @@ public class IngestActionFilterTests extends ESTestCase {
.build()
);
PipelineStore store = mock(PipelineStore.class);
Pipeline.Builder pipelineBuilder = new Pipeline.Builder("_id");
SimpleProcessor.Builder processorBuilder = new SimpleProcessor.Builder();
processorBuilder.setPath("field1");
processorBuilder.setExpectedValue("value1");
processorBuilder.setAddField("field2");
processorBuilder.setAddFieldValue("value2");
pipelineBuilder.addProcessors(processorBuilder);
when(store.get("_id")).thenReturn(pipelineBuilder.build());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(new SimpleProcessor("field1", "value1", "field2", "value2"))));
executionService = new PipelineExecutionService(store, threadPool);
filter = new IngestActionFilter(Settings.EMPTY, executionService);