Merge branch 'feature/ingest' into ingest/date
This commit is contained in:
commit
dbf5c96876
|
@ -41,14 +41,15 @@ public interface Processor {
|
|||
/**
|
||||
* A factory that knows how to construct a processor based on a map of maps.
|
||||
*/
|
||||
interface Factory extends Closeable {
|
||||
interface Factory<P extends Processor> extends Closeable {
|
||||
|
||||
/**
|
||||
* Creates a processor based on the specified map of maps config
|
||||
*/
|
||||
Processor create(Map<String, Object> config) throws IOException;
|
||||
P create(Map<String, Object> config) throws IOException;
|
||||
|
||||
/**
|
||||
* Sets the configuration directory when needed to read additional config files
|
||||
*/
|
||||
default void setConfigDirectory(Path configDirectory) {
|
||||
}
|
||||
|
|
|
@ -27,14 +27,13 @@ import com.maxmind.geoip2.record.*;
|
|||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.file.Files;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.security.AccessController;
|
||||
|
@ -159,12 +158,12 @@ public final class GeoIpProcessor implements Processor {
|
|||
return geoData;
|
||||
}
|
||||
|
||||
public static class Factory implements Processor.Factory {
|
||||
public static class Factory implements Processor.Factory<GeoIpProcessor> {
|
||||
|
||||
private Path geoIpConfigDirectory;
|
||||
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
|
||||
|
||||
public Processor create(Map<String, Object> config) throws IOException {
|
||||
public GeoIpProcessor create(Map<String, Object> config) throws IOException {
|
||||
String ipField = readStringProperty(config, "ip_field", null);
|
||||
String targetField = readStringProperty(config, "target_field", "geoip");
|
||||
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
|
||||
|
|
|
@ -64,10 +64,10 @@ public final class GrokProcessor implements Processor {
|
|||
return grok;
|
||||
}
|
||||
|
||||
public static class Factory implements Processor.Factory {
|
||||
public static class Factory implements Processor.Factory<GrokProcessor> {
|
||||
private Path grokConfigDirectory;
|
||||
|
||||
public Processor create(Map<String, Object> config) throws IOException {
|
||||
public GrokProcessor create(Map<String, Object> config) throws IOException {
|
||||
String matchField = ConfigurationUtils.readStringProperty(config, "field", null);
|
||||
String matchPattern = ConfigurationUtils.readStringProperty(config, "pattern", null);
|
||||
Map<String, String> patternBank = new HashMap<>();
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.ingest.Data;
|
|||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
||||
import org.elasticsearch.ingest.processor.Processor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public final class SimpleProcessor implements Processor {
|
||||
|
@ -53,9 +52,9 @@ public final class SimpleProcessor implements Processor {
|
|||
}
|
||||
}
|
||||
|
||||
public static class Factory implements Processor.Factory {
|
||||
public static class Factory implements Processor.Factory<SimpleProcessor> {
|
||||
|
||||
public Processor create(Map<String, Object> config) {
|
||||
public SimpleProcessor create(Map<String, Object> config) {
|
||||
String path = ConfigurationUtils.readStringProperty(config, "path", null);
|
||||
String expectedValue = ConfigurationUtils.readStringProperty(config, "expected_value", null);
|
||||
String addField = ConfigurationUtils.readStringProperty(config, "add_field", null);
|
||||
|
|
|
@ -23,16 +23,16 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.StreamsUtils;
|
||||
import org.junit.Before;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class GeoProcessorFactoryTests extends ESTestCase {
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class GeoIpProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
private Path configDir;
|
||||
|
||||
|
@ -52,7 +52,7 @@ public class GeoProcessorFactoryTests extends ESTestCase {
|
|||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("ip_field", "_field");
|
||||
|
||||
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
|
||||
GeoIpProcessor processor = factory.create(config);
|
||||
assertThat(processor.getIpField(), equalTo("_field"));
|
||||
assertThat(processor.getTargetField(), equalTo("geoip"));
|
||||
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));
|
||||
|
@ -64,7 +64,7 @@ public class GeoProcessorFactoryTests extends ESTestCase {
|
|||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("ip_field", "_field");
|
||||
config.put("target_field", "_field");
|
||||
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
|
||||
GeoIpProcessor processor = factory.create(config);
|
||||
assertThat(processor.getIpField(), equalTo("_field"));
|
||||
assertThat(processor.getTargetField(), equalTo("_field"));
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ public class GeoProcessorFactoryTests extends ESTestCase {
|
|||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("ip_field", "_field");
|
||||
config.put("database_file", "GeoLite2-Country.mmdb");
|
||||
GeoIpProcessor processor = (GeoIpProcessor) factory.create(config);
|
||||
GeoIpProcessor processor = factory.create(config);
|
||||
assertThat(processor.getIpField(), equalTo("_field"));
|
||||
assertThat(processor.getTargetField(), equalTo("geoip"));
|
||||
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
|
|
@ -29,7 +29,7 @@ import java.util.Map;
|
|||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class GeoProcessorTests extends ESTestCase {
|
||||
public class GeoIpProcessorTests extends ESTestCase {
|
||||
|
||||
public void testCity() throws Exception {
|
||||
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
|
|
@ -49,7 +49,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
|||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("field", "_field");
|
||||
config.put("pattern", "(?<foo>\\w+)");
|
||||
GrokProcessor processor = (GrokProcessor) factory.create(config);
|
||||
GrokProcessor processor = factory.create(config);
|
||||
assertThat(processor.getMatchField(), equalTo("_field"));
|
||||
assertThat(processor.getGrok(), notNullValue());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue