[ML] Add an ingest pipeline definition to structure finder (#34350)
The ingest pipeline that is produced is very simple. It contains a grok processor if the format is semi-structured text, a date processor if the format contains a timestamp, and a remove processor if required to remove the interim timestamp field parsed out of semi-structured text. Eventually the UI should offer the option to customize the pipeline with additional processors to perform other data preparation steps before ingesting data to an index.
This commit is contained in:
parent
7352f0da60
commit
21c759af0e
|
@ -613,6 +613,20 @@ If the request does not encounter errors, you receive the following result:
|
|||
"type" : "double"
|
||||
}
|
||||
},
|
||||
"ingest_pipeline" : {
|
||||
"description" : "Ingest pipeline created by file structure finder",
|
||||
"processors" : [
|
||||
{
|
||||
"date" : {
|
||||
"field" : "tpep_pickup_datetime",
|
||||
"timezone" : "{{ beat.timezone }}",
|
||||
"formats" : [
|
||||
"YYYY-MM-dd HH:mm:ss"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"field_stats" : {
|
||||
"DOLocationID" : {
|
||||
"count" : 19998,
|
||||
|
@ -1366,6 +1380,33 @@ this:
|
|||
"type" : "text"
|
||||
}
|
||||
},
|
||||
"ingest_pipeline" : {
|
||||
"description" : "Ingest pipeline created by file structure finder",
|
||||
"processors" : [
|
||||
{
|
||||
"grok" : {
|
||||
"field" : "message",
|
||||
"patterns" : [
|
||||
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel}.*"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"date" : {
|
||||
"field" : "timestamp",
|
||||
"timezone" : "{{ beat.timezone }}",
|
||||
"formats" : [
|
||||
"ISO8601"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"remove" : {
|
||||
"field" : "timestamp"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"field_stats" : {
|
||||
"loglevel" : {
|
||||
"count" : 53,
|
||||
|
@ -1499,6 +1540,33 @@ this:
|
|||
"type" : "keyword"
|
||||
}
|
||||
},
|
||||
"ingest_pipeline" : {
|
||||
"description" : "Ingest pipeline created by file structure finder",
|
||||
"processors" : [
|
||||
{
|
||||
"grok" : {
|
||||
"field" : "message",
|
||||
"patterns" : [
|
||||
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel} *\\]\\[%{JAVACLASS:class} *\\] \\[%{HOSTNAME:node}\\] %{JAVALOGMESSAGE:message}"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"date" : {
|
||||
"field" : "timestamp",
|
||||
"timezone" : "{{ beat.timezone }}",
|
||||
"formats" : [
|
||||
"ISO8601"
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"remove" : {
|
||||
"field" : "timestamp"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"field_stats" : { <2>
|
||||
"class" : {
|
||||
"count" : 53,
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -103,6 +104,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
public static final ParseField JAVA_TIMESTAMP_FORMATS = new ParseField("java_timestamp_formats");
|
||||
public static final ParseField NEED_CLIENT_TIMEZONE = new ParseField("need_client_timezone");
|
||||
public static final ParseField MAPPINGS = new ParseField("mappings");
|
||||
public static final ParseField INGEST_PIPELINE = new ParseField("ingest_pipeline");
|
||||
public static final ParseField FIELD_STATS = new ParseField("field_stats");
|
||||
public static final ParseField EXPLANATION = new ParseField("explanation");
|
||||
|
||||
|
@ -128,6 +130,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
PARSER.declareStringArray(Builder::setJavaTimestampFormats, JAVA_TIMESTAMP_FORMATS);
|
||||
PARSER.declareBoolean(Builder::setNeedClientTimezone, NEED_CLIENT_TIMEZONE);
|
||||
PARSER.declareObject(Builder::setMappings, (p, c) -> new TreeMap<>(p.map()), MAPPINGS);
|
||||
PARSER.declareObject(Builder::setIngestPipeline, (p, c) -> p.mapOrdered(), INGEST_PIPELINE);
|
||||
PARSER.declareObject(Builder::setFieldStats, (p, c) -> {
|
||||
Map<String, FieldStats> fieldStats = new TreeMap<>();
|
||||
while (p.nextToken() == XContentParser.Token.FIELD_NAME) {
|
||||
|
@ -157,6 +160,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
private final String timestampField;
|
||||
private final boolean needClientTimezone;
|
||||
private final SortedMap<String, Object> mappings;
|
||||
private final Map<String, Object> ingestPipeline;
|
||||
private final SortedMap<String, FieldStats> fieldStats;
|
||||
private final List<String> explanation;
|
||||
|
||||
|
@ -164,8 +168,8 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
Format format, String multilineStartPattern, String excludeLinesPattern, List<String> columnNames,
|
||||
Boolean hasHeaderRow, Character delimiter, Character quote, Boolean shouldTrimFields, String grokPattern,
|
||||
String timestampField, List<String> jodaTimestampFormats, List<String> javaTimestampFormats,
|
||||
boolean needClientTimezone, Map<String, Object> mappings, Map<String, FieldStats> fieldStats,
|
||||
List<String> explanation) {
|
||||
boolean needClientTimezone, Map<String, Object> mappings, Map<String, Object> ingestPipeline,
|
||||
Map<String, FieldStats> fieldStats, List<String> explanation) {
|
||||
|
||||
this.numLinesAnalyzed = numLinesAnalyzed;
|
||||
this.numMessagesAnalyzed = numMessagesAnalyzed;
|
||||
|
@ -188,6 +192,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
(javaTimestampFormats == null) ? null : Collections.unmodifiableList(new ArrayList<>(javaTimestampFormats));
|
||||
this.needClientTimezone = needClientTimezone;
|
||||
this.mappings = Collections.unmodifiableSortedMap(new TreeMap<>(mappings));
|
||||
this.ingestPipeline = (ingestPipeline == null) ? null : Collections.unmodifiableMap(new LinkedHashMap<>(ingestPipeline));
|
||||
this.fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(fieldStats));
|
||||
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
|
||||
}
|
||||
|
@ -212,6 +217,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
timestampField = in.readOptionalString();
|
||||
needClientTimezone = in.readBoolean();
|
||||
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
|
||||
ingestPipeline = in.readBoolean() ? Collections.unmodifiableMap(in.readMap()) : null;
|
||||
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
|
||||
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
|
||||
}
|
||||
|
@ -262,6 +268,12 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
out.writeOptionalString(timestampField);
|
||||
out.writeBoolean(needClientTimezone);
|
||||
out.writeMap(mappings);
|
||||
if (ingestPipeline == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeMap(ingestPipeline);
|
||||
}
|
||||
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
|
||||
out.writeCollection(explanation, StreamOutput::writeString);
|
||||
}
|
||||
|
@ -342,6 +354,10 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
return mappings;
|
||||
}
|
||||
|
||||
public Map<String, Object> getIngestPipeline() {
|
||||
return ingestPipeline;
|
||||
}
|
||||
|
||||
public SortedMap<String, FieldStats> getFieldStats() {
|
||||
return fieldStats;
|
||||
}
|
||||
|
@ -397,6 +413,9 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
}
|
||||
builder.field(NEED_CLIENT_TIMEZONE.getPreferredName(), needClientTimezone);
|
||||
builder.field(MAPPINGS.getPreferredName(), mappings);
|
||||
if (ingestPipeline != null) {
|
||||
builder.field(INGEST_PIPELINE.getPreferredName(), ingestPipeline);
|
||||
}
|
||||
if (fieldStats.isEmpty() == false) {
|
||||
builder.startObject(FIELD_STATS.getPreferredName());
|
||||
for (Map.Entry<String, FieldStats> entry : fieldStats.entrySet()) {
|
||||
|
@ -476,6 +495,7 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
private List<String> javaTimestampFormats;
|
||||
private boolean needClientTimezone;
|
||||
private Map<String, Object> mappings;
|
||||
private Map<String, Object> ingestPipeline;
|
||||
private Map<String, FieldStats> fieldStats = Collections.emptyMap();
|
||||
private List<String> explanation;
|
||||
|
||||
|
@ -582,6 +602,11 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setIngestPipeline(Map<String, Object> ingestPipeline) {
|
||||
this.ingestPipeline = ingestPipeline;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setFieldStats(Map<String, FieldStats> fieldStats) {
|
||||
this.fieldStats = Objects.requireNonNull(fieldStats);
|
||||
return this;
|
||||
|
@ -708,7 +733,8 @@ public class FileStructure implements ToXContentObject, Writeable {
|
|||
|
||||
return new FileStructure(numLinesAnalyzed, numMessagesAnalyzed, sampleStart, charset, hasByteOrderMarker, format,
|
||||
multilineStartPattern, excludeLinesPattern, columnNames, hasHeaderRow, delimiter, quote, shouldTrimFields, grokPattern,
|
||||
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, fieldStats, explanation);
|
||||
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, ingestPipeline, fieldStats,
|
||||
explanation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import java.nio.charset.Charset;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
@ -74,6 +75,14 @@ public class FileStructureTests extends AbstractSerializingTestCase<FileStructur
|
|||
}
|
||||
builder.setMappings(mappings);
|
||||
|
||||
if (randomBoolean()) {
|
||||
Map<String, Object> ingestPipeline = new LinkedHashMap<>();
|
||||
for (String field : generateRandomStringArray(5, 20, false, false)) {
|
||||
ingestPipeline.put(field, Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(10)));
|
||||
}
|
||||
builder.setMappings(ingestPipeline);
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
Map<String, FieldStats> fieldStats = new TreeMap<>();
|
||||
for (String field : generateRandomStringArray(5, 20, false, false)) {
|
||||
|
|
|
@ -142,10 +142,14 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
|
|||
.collect(Collectors.joining(",")));
|
||||
}
|
||||
|
||||
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
|
||||
|
||||
structureBuilder.setTimestampField(timeField.v1())
|
||||
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
|
||||
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
|
||||
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing())
|
||||
.setNeedClientTimezone(needClientTimeZone)
|
||||
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
|
||||
timeField.v2().jodaTimestampFormats, needClientTimeZone))
|
||||
.setMultilineStartPattern(timeLineRegex);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.filestructurefinder;
|
|||
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.grok.Grok;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
|
||||
import org.elasticsearch.xpack.ml.filestructurefinder.TimestampFormatFinder.TimestampMatch;
|
||||
|
||||
|
@ -15,6 +16,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -37,6 +39,8 @@ public final class FileStructureUtils {
|
|||
private static final int KEYWORD_MAX_LEN = 256;
|
||||
private static final int KEYWORD_MAX_SPACES = 5;
|
||||
|
||||
private static final String BEAT_TIMEZONE_FIELD = "beat.timezone";
|
||||
|
||||
private FileStructureUtils() {
|
||||
}
|
||||
|
||||
|
@ -306,4 +310,53 @@ public final class FileStructureUtils {
|
|||
int length = str.length();
|
||||
return length > KEYWORD_MAX_LEN || length - str.replaceAll("\\s", "").length() > KEYWORD_MAX_SPACES;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an ingest pipeline definition appropriate for the file structure.
|
||||
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
|
||||
* fully structured formats.
|
||||
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
|
||||
* <code>null</code> if there is no timestamp.
|
||||
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
|
||||
* May be <code>null</code> if {@code timestampField} is also <code>null</code>.
|
||||
* @param needClientTimezone Is the timezone of the client supplying data to ingest required to uniquely parse the timestamp?
|
||||
* @return The ingest pipeline definition, or <code>null</code> if none is required.
|
||||
*/
|
||||
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, String timestampField, List<String> timestampFormats,
|
||||
boolean needClientTimezone) {
|
||||
|
||||
if (grokPattern == null && timestampField == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, Object> pipeline = new LinkedHashMap<>();
|
||||
pipeline.put(Pipeline.DESCRIPTION_KEY, "Ingest pipeline created by file structure finder");
|
||||
|
||||
List<Map<String, Object>> processors = new ArrayList<>();
|
||||
|
||||
if (grokPattern != null) {
|
||||
Map<String, Object> grokProcessorSettings = new LinkedHashMap<>();
|
||||
grokProcessorSettings.put("field", "message");
|
||||
grokProcessorSettings.put("patterns", Collections.singletonList(grokPattern));
|
||||
processors.add(Collections.singletonMap("grok", grokProcessorSettings));
|
||||
}
|
||||
|
||||
if (timestampField != null) {
|
||||
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
|
||||
dateProcessorSettings.put("field", timestampField);
|
||||
if (needClientTimezone) {
|
||||
dateProcessorSettings.put("timezone", "{{ " + BEAT_TIMEZONE_FIELD + " }}");
|
||||
}
|
||||
dateProcessorSettings.put("formats", timestampFormats);
|
||||
processors.add(Collections.singletonMap("date", dateProcessorSettings));
|
||||
}
|
||||
|
||||
// This removes the interim timestamp field used for semi-structured text formats
|
||||
if (grokPattern != null && timestampField != null) {
|
||||
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
|
||||
}
|
||||
|
||||
pipeline.put(Pipeline.PROCESSORS_KEY, processors);
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,10 +56,14 @@ public class JsonFileStructureFinder implements FileStructureFinder {
|
|||
Tuple<String, TimestampMatch> timeField =
|
||||
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
|
||||
if (timeField != null) {
|
||||
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
|
||||
|
||||
structureBuilder.setTimestampField(timeField.v1())
|
||||
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
|
||||
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
|
||||
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
|
||||
.setNeedClientTimezone(needClientTimeZone)
|
||||
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
|
||||
timeField.v2().jodaTimestampFormats, needClientTimeZone));
|
||||
}
|
||||
|
||||
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
|
||||
|
|
|
@ -113,12 +113,16 @@ public class TextLogFileStructureFinder implements FileStructureFinder {
|
|||
}
|
||||
}
|
||||
|
||||
boolean needClientTimeZone = bestTimestamp.v1().hasTimezoneDependentParsing();
|
||||
|
||||
FileStructure structure = structureBuilder
|
||||
.setTimestampField(interimTimestampField)
|
||||
.setJodaTimestampFormats(bestTimestamp.v1().jodaTimestampFormats)
|
||||
.setJavaTimestampFormats(bestTimestamp.v1().javaTimestampFormats)
|
||||
.setNeedClientTimezone(bestTimestamp.v1().hasTimezoneDependentParsing())
|
||||
.setNeedClientTimezone(needClientTimeZone)
|
||||
.setGrokPattern(grokPattern)
|
||||
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, interimTimestampField,
|
||||
bestTimestamp.v1().jodaTimestampFormats, needClientTimeZone))
|
||||
.setMappings(mappings)
|
||||
.setFieldStats(fieldStats)
|
||||
.setExplanation(explanation)
|
||||
|
|
|
@ -95,10 +95,14 @@ public class XmlFileStructureFinder implements FileStructureFinder {
|
|||
Tuple<String, TimestampMatch> timeField =
|
||||
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
|
||||
if (timeField != null) {
|
||||
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
|
||||
|
||||
structureBuilder.setTimestampField(timeField.v1())
|
||||
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
|
||||
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
|
||||
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
|
||||
.setNeedClientTimezone(needClientTimeZone)
|
||||
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, topLevelTag + "." + timeField.v1(),
|
||||
timeField.v2().jodaTimestampFormats, needClientTimeZone));
|
||||
}
|
||||
|
||||
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
|
||||
|
|
|
@ -345,6 +345,75 @@ public class FileStructureUtilsTests extends FileStructureTestCase {
|
|||
assertNull(fieldStats.get("nothing"));
|
||||
}
|
||||
|
||||
public void testMakeIngestPipelineDefinitionGivenStructuredWithoutTimestamp() {
|
||||
|
||||
assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, null, null, false));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() {
|
||||
|
||||
String timestampField = randomAlphaOfLength(10);
|
||||
List<String> timestampFormats = randomFrom(TimestampFormatFinder.ORDERED_CANDIDATE_FORMATS).jodaTimestampFormats;
|
||||
boolean needClientTimezone = randomBoolean();
|
||||
|
||||
Map<String, Object> pipeline =
|
||||
FileStructureUtils.makeIngestPipelineDefinition(null, timestampField, timestampFormats, needClientTimezone);
|
||||
assertNotNull(pipeline);
|
||||
|
||||
assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
|
||||
|
||||
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
|
||||
assertNotNull(processors);
|
||||
assertEquals(1, processors.size());
|
||||
|
||||
Map<String, Object> dateProcessor = (Map<String, Object>) processors.get(0).get("date");
|
||||
assertNotNull(dateProcessor);
|
||||
assertEquals(timestampField, dateProcessor.get("field"));
|
||||
assertEquals(needClientTimezone, dateProcessor.containsKey("timezone"));
|
||||
assertEquals(timestampFormats, dateProcessor.get("formats"));
|
||||
|
||||
// After removing the two expected fields there should be nothing left in the pipeline
|
||||
assertEquals(Collections.emptyMap(), pipeline);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testMakeIngestPipelineDefinitionGivenSemiStructured() {
|
||||
|
||||
String grokPattern = randomAlphaOfLength(100);
|
||||
String timestampField = randomAlphaOfLength(10);
|
||||
List<String> timestampFormats = randomFrom(TimestampFormatFinder.ORDERED_CANDIDATE_FORMATS).jodaTimestampFormats;
|
||||
boolean needClientTimezone = randomBoolean();
|
||||
|
||||
Map<String, Object> pipeline =
|
||||
FileStructureUtils.makeIngestPipelineDefinition(grokPattern, timestampField, timestampFormats, needClientTimezone);
|
||||
assertNotNull(pipeline);
|
||||
|
||||
assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));
|
||||
|
||||
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipeline.remove("processors");
|
||||
assertNotNull(processors);
|
||||
assertEquals(3, processors.size());
|
||||
|
||||
Map<String, Object> grokProcessor = (Map<String, Object>) processors.get(0).get("grok");
|
||||
assertNotNull(grokProcessor);
|
||||
assertEquals("message", grokProcessor.get("field"));
|
||||
assertEquals(Collections.singletonList(grokPattern), grokProcessor.get("patterns"));
|
||||
|
||||
Map<String, Object> dateProcessor = (Map<String, Object>) processors.get(1).get("date");
|
||||
assertNotNull(dateProcessor);
|
||||
assertEquals(timestampField, dateProcessor.get("field"));
|
||||
assertEquals(needClientTimezone, dateProcessor.containsKey("timezone"));
|
||||
assertEquals(timestampFormats, dateProcessor.get("formats"));
|
||||
|
||||
Map<String, Object> removeProcessor = (Map<String, Object>) processors.get(2).get("remove");
|
||||
assertNotNull(removeProcessor);
|
||||
assertEquals(timestampField, dateProcessor.get("field"));
|
||||
|
||||
// After removing the two expected fields there should be nothing left in the pipeline
|
||||
assertEquals(Collections.emptyMap(), pipeline);
|
||||
}
|
||||
|
||||
private Map<String, String> guessMapping(List<String> explanation, String fieldName, List<Object> fieldValues) {
|
||||
Tuple<Map<String, String>, FieldStats> mappingAndFieldStats = FileStructureUtils.guessMappingAndCalculateFieldStats(explanation,
|
||||
fieldName, fieldValues, NOOP_TIMEOUT_CHECKER);
|
||||
|
|
|
@ -36,6 +36,9 @@
|
|||
- match: { mappings.sourcetype.type: keyword }
|
||||
- match: { mappings.time.type: date }
|
||||
- match: { mappings.time.format: epoch_second }
|
||||
- match: { ingest_pipeline.description: "Ingest pipeline created by file structure finder" }
|
||||
- match: { ingest_pipeline.processors.0.date.field: time }
|
||||
- match: { ingest_pipeline.processors.0.date.formats.0: UNIX }
|
||||
- match: { field_stats.airline.count: 3 }
|
||||
- match: { field_stats.airline.cardinality: 2 }
|
||||
- match: { field_stats.responsetime.count: 3 }
|
||||
|
@ -93,6 +96,9 @@
|
|||
- match: { mappings.sourcetype.type: keyword }
|
||||
- match: { mappings.time.type: date }
|
||||
- match: { mappings.time.format: epoch_second }
|
||||
- match: { ingest_pipeline.description: "Ingest pipeline created by file structure finder" }
|
||||
- match: { ingest_pipeline.processors.0.date.field: time }
|
||||
- match: { ingest_pipeline.processors.0.date.formats.0: UNIX }
|
||||
- match: { field_stats.airline.count: 3 }
|
||||
- match: { field_stats.airline.cardinality: 2 }
|
||||
- match: { field_stats.responsetime.count: 3 }
|
||||
|
|
Loading…
Reference in New Issue