From 00dc2ba36f6c2f97d0967108c69a42973f51340f Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 31 Oct 2018 10:55:11 +0000 Subject: [PATCH] [ML] Enable reusing field extraction logic when no time field is required (#35100) --- .../{scroll => fields}/ExtractedField.java | 4 +- .../{scroll => fields}/ExtractedFields.java | 76 +++-------- .../fields/TimeBasedExtractedFields.java | 66 ++++++++++ .../extractor/scroll/ScrollDataExtractor.java | 1 + .../scroll/ScrollDataExtractorContext.java | 5 +- .../scroll/ScrollDataExtractorFactory.java | 10 +- .../scroll/SearchHitToJsonProcessor.java | 2 + .../ExtractedFieldTests.java | 3 +- .../fields/ExtractedFieldsTests.java | 120 ++++++++++++++++++ .../TimeBasedExtractedFieldsTests.java} | 61 ++++----- .../scroll/ScrollDataExtractorTests.java | 6 +- .../scroll/SearchHitToJsonProcessorTests.java | 9 +- .../test/ml/start_stop_datafeed.yml | 2 +- 13 files changed, 262 insertions(+), 103 deletions(-) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/{scroll => fields}/ExtractedField.java (98%) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/{scroll => fields}/ExtractedFields.java (51%) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFields.java rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/{scroll => fields}/ExtractedFieldTests.java (98%) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldsTests.java rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/{scroll/ExtractedFieldsTests.java => fields/TimeBasedExtractedFieldsTests.java} (80%) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java similarity index 98% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedField.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java index f06eaaf8139..232cd53a359 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.search.SearchHit; @@ -18,7 +18,7 @@ import java.util.Objects; * Represents a field to be extracted by the datafeed. * It encapsulates the extraction logic. */ -abstract class ExtractedField { +public abstract class ExtractedField { public enum ExtractionMethod { SOURCE, DOC_VALUE, SCRIPT_FIELD diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFields.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java similarity index 51% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFields.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java index 2137141e4fb..f9b2467fbcf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFields.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java @@ -3,18 +3,13 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -25,20 +20,15 @@ import java.util.stream.Collectors; /** * The fields the datafeed has to extract */ -class ExtractedFields { +public class ExtractedFields { private static final String TEXT = "text"; - private final ExtractedField timeField; private final List allFields; private final List docValueFields; private final String[] sourceFields; - ExtractedFields(ExtractedField timeField, List allFields) { - if (!allFields.contains(timeField)) { - throw new IllegalArgumentException("timeField should also be contained in allFields"); - } - this.timeField = Objects.requireNonNull(timeField); + public ExtractedFields(List allFields) { this.allFields = Collections.unmodifiableList(allFields); this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields); this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields).stream().map(ExtractedField::getName) @@ -61,60 +51,33 @@ class ExtractedFields { return fields.stream().filter(field -> field.getExtractionMethod() == method).collect(Collectors.toList()); } - public String timeField() { - return timeField.getName(); + public static ExtractedFields build(Collection allFields, Set scriptFields, + FieldCapabilitiesResponse fieldsCapabilities) { + ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities); + return new ExtractedFields(allFields.stream().map(field -> extractionMethodDetector.detect(field)).collect(Collectors.toList())); } - public Long timeFieldValue(SearchHit hit) { - Object[] value = timeField.value(hit); - if (value.length != 1) { - throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: " - + Arrays.toString(value)); - } - if (value[0] instanceof Long) { - return (Long) value[0]; - } - throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]); - } + protected static class ExtractionMethodDetector { - public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) { - Set scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet()); - ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(datafeed.getId(), scriptFields, - fieldsCapabilities); - String timeField = job.getDataDescription().getTimeField(); - if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) { - throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField - + "] because it is not aggregatable"); - } - ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ? - ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE); - List remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList()); - List allExtractedFields = new ArrayList<>(remainingFields.size() + 1); - allExtractedFields.add(timeExtractedField); - remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field))); - return new ExtractedFields(timeExtractedField, allExtractedFields); - } - - private static class ExtractionMethodDetector { - - private final String datafeedId; private final Set scriptFields; private final FieldCapabilitiesResponse fieldsCapabilities; - private ExtractionMethodDetector(String datafeedId, Set scriptFields, FieldCapabilitiesResponse fieldsCapabilities) { - this.datafeedId = datafeedId; + protected ExtractionMethodDetector(Set scriptFields, FieldCapabilitiesResponse fieldsCapabilities) { this.scriptFields = scriptFields; this.fieldsCapabilities = fieldsCapabilities; } - private ExtractedField detect(String field) { + protected ExtractedField detect(String field) { String internalField = field; ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE; if (scriptFields.contains(field)) { method = ExtractedField.ExtractionMethod.SCRIPT_FIELD; } else if (isAggregatable(field)) { method = ExtractedField.ExtractionMethod.DOC_VALUE; - } else if (isText(field)) { + if (isFieldOfType(field, "date")) { + return ExtractedField.newTimeField(field, method); + } + } else if (isFieldOfType(field, TEXT)) { String parentField = MlStrings.getParentField(field); // Field is text so check if it is a multi-field if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) { @@ -127,11 +90,10 @@ class ExtractedFields { return ExtractedField.newField(field, internalField, method); } - private boolean isAggregatable(String field) { + protected boolean isAggregatable(String field) { Map fieldCaps = fieldsCapabilities.getField(field); if (fieldCaps == null || fieldCaps.isEmpty()) { - throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field - + "] because it has no mappings"); + throw new IllegalArgumentException("cannot retrieve field [" + field + "] because it has no mappings"); } for (FieldCapabilities capsPerIndex : fieldCaps.values()) { if (!capsPerIndex.isAggregatable()) { @@ -141,10 +103,10 @@ class ExtractedFields { return true; } - private boolean isText(String field) { + private boolean isFieldOfType(String field, String type) { Map fieldCaps = fieldsCapabilities.getField(field); if (fieldCaps != null && fieldCaps.size() == 1) { - return fieldCaps.containsKey(TEXT); + return fieldCaps.containsKey(type); } return false; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFields.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFields.java new file mode 100644 index 00000000000..cf87671bf33 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFields.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; + +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The fields to extract for a datafeed that requires a time field + */ +public class TimeBasedExtractedFields extends ExtractedFields { + + private final ExtractedField timeField; + + public TimeBasedExtractedFields(ExtractedField timeField, List allFields) { + super(allFields); + if (!allFields.contains(timeField)) { + throw new IllegalArgumentException("timeField should also be contained in allFields"); + } + this.timeField = Objects.requireNonNull(timeField); + } + + public String timeField() { + return timeField.getName(); + } + + public Long timeFieldValue(SearchHit hit) { + Object[] value = timeField.value(hit); + if (value.length != 1) { + throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: " + + Arrays.toString(value)); + } + if (value[0] instanceof Long) { + return (Long) value[0]; + } + throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]); + } + + public static TimeBasedExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) { + Set scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet()); + ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities); + String timeField = job.getDataDescription().getTimeField(); + if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) { + throw new IllegalArgumentException("cannot retrieve time field [" + timeField + "] because it is not aggregatable"); + } + ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ? + ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE); + List remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList()); + List allExtractedFields = new ArrayList<>(remainingFields.size() + 1); + allExtractedFields.add(timeExtractedField); + remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field))); + return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index ae62453dff5..d890ce8a3fe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java index d1666497d24..08e693849ec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; import java.util.List; import java.util.Map; @@ -15,7 +16,7 @@ import java.util.Objects; class ScrollDataExtractorContext { final String jobId; - final ExtractedFields extractedFields; + final TimeBasedExtractedFields extractedFields; final String[] indices; final String[] types; final QueryBuilder query; @@ -25,7 +26,7 @@ class ScrollDataExtractorContext { final long end; final Map headers; - ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List indices, List types, + ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List indices, List types, QueryBuilder query, List scriptFields, int scrollSize, long start, long end, Map headers) { this.jobId = Objects.requireNonNull(jobId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index 2c6e0deaebd..67689bd51b8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -16,8 +16,10 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; import java.util.Objects; @@ -26,9 +28,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { private final Client client; private final DatafeedConfig datafeedConfig; private final Job job; - private final ExtractedFields extractedFields; + private final TimeBasedExtractedFields extractedFields; - private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, ExtractedFields extractedFields) { + private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, TimeBasedExtractedFields extractedFields) { this.client = Objects.requireNonNull(client); this.datafeedConfig = Objects.requireNonNull(datafeedConfig); this.job = Objects.requireNonNull(job); @@ -56,12 +58,14 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { // Step 2. Contruct the factory and notify listener ActionListener fieldCapabilitiesHandler = ActionListener.wrap( fieldCapabilitiesResponse -> { - ExtractedFields extractedFields = ExtractedFields.build(job, datafeed, fieldCapabilitiesResponse); + TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse); listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields)); }, e -> { if (e instanceof IndexNotFoundException) { listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId() + "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist")); + } else if (e instanceof IllegalArgumentException) { + listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage())); } else { listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessor.java index 52808ce3978..577d114d957 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessor.java @@ -9,6 +9,8 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; import java.io.IOException; import java.io.OutputStream; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java similarity index 98% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java index d29769b607e..1e5e6fa652d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java @@ -3,11 +3,12 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.joda.time.DateTime; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldsTests.java new file mode 100644 index 00000000000..22253114136 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldsTests.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; + +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExtractedFieldsTests extends ESTestCase { + + public void testAllTypesOfFields() { + ExtractedField docValue1 = ExtractedField.newField("doc1", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField docValue2 = ExtractedField.newField("doc2", ExtractedField.ExtractionMethod.DOC_VALUE); + ExtractedField scriptField1 = ExtractedField.newField("scripted1", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD); + ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE); + ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE); + ExtractedFields extractedFields = new ExtractedFields(Arrays.asList( + docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2)); + + assertThat(extractedFields.getAllFields().size(), equalTo(6)); + assertThat(extractedFields.getDocValueFields().stream().map(ExtractedField::getName).toArray(String[]::new), + equalTo(new String[] {"doc1", "doc2"})); + assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"src1", "src2"})); + } + + public void testBuildGivenMixtureOfTypes() { + Map timeCaps = new HashMap<>(); + timeCaps.put("date", createFieldCaps(true)); + Map valueCaps = new HashMap<>(); + valueCaps.put("float", createFieldCaps(true)); + valueCaps.put("keyword", createFieldCaps(true)); + Map airlineCaps = new HashMap<>(); + airlineCaps.put("text", createFieldCaps(false)); + FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); + when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); + when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps); + when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps); + + ExtractedFields extractedFields = ExtractedFields.build(Arrays.asList("time", "value", "airline", "airport"), + new HashSet<>(Collections.singletonList("airport")), fieldCapabilitiesResponse); + + assertThat(extractedFields.getDocValueFields().size(), equalTo(2)); + assertThat(extractedFields.getDocValueFields().get(0).getName(), equalTo("time")); + assertThat(extractedFields.getDocValueFields().get(0).getDocValueFormat(), equalTo("epoch_millis")); + assertThat(extractedFields.getDocValueFields().get(1).getName(), equalTo("value")); + assertThat(extractedFields.getDocValueFields().get(1).getDocValueFormat(), equalTo(DocValueFieldsContext.USE_DEFAULT_FORMAT)); + assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"airline"})); + assertThat(extractedFields.getAllFields().size(), equalTo(4)); + } + + public void testBuildGivenMultiFields() { + Job.Builder jobBuilder = new Job.Builder("foo"); + jobBuilder.setDataDescription(new DataDescription.Builder()); + Detector.Builder detector = new Detector.Builder("count", null); + detector.setByFieldName("airline.text"); + detector.setOverFieldName("airport.keyword"); + jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build()))); + + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId()); + datafeedBuilder.setIndices(Collections.singletonList("foo")); + + Map text = new HashMap<>(); + text.put("text", createFieldCaps(false)); + Map keyword = new HashMap<>(); + keyword.put("keyword", createFieldCaps(true)); + FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); + when(fieldCapabilitiesResponse.getField("airline")).thenReturn(text); + when(fieldCapabilitiesResponse.getField("airline.text")).thenReturn(text); + when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text); + when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword); + + ExtractedFields extractedFields = ExtractedFields.build(Arrays.asList("airline.text", "airport.keyword"), + Collections.emptySet(), fieldCapabilitiesResponse); + + assertThat(extractedFields.getDocValueFields().size(), equalTo(1)); + assertThat(extractedFields.getDocValueFields().get(0).getName(), equalTo("airport.keyword")); + assertThat(extractedFields.getSourceFields().length, equalTo(1)); + assertThat(extractedFields.getSourceFields()[0], equalTo("airline")); + assertThat(extractedFields.getAllFields().size(), equalTo(2)); + + assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airport.keyword")).findFirst().get().getAlias(), + equalTo("airport.keyword")); + assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airline")).findFirst().get().getAlias(), + equalTo("airline.text")); + } + + public void testBuildGivenFieldWithoutMappings() { + FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ExtractedFields.build( + Collections.singletonList("value"), Collections.emptySet(), fieldCapabilitiesResponse)); + assertThat(e.getMessage(), equalTo("cannot retrieve field [value] because it has no mappings")); + } + + private static FieldCapabilities createFieldCaps(boolean isAggregatable) { + FieldCapabilities fieldCaps = mock(FieldCapabilities.class); + when(fieldCaps.isAggregatable()).thenReturn(isAggregatable); + return fieldCaps; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFieldsTests.java similarity index 80% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldsTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFieldsTests.java index d029cfea590..5e388afad28 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ExtractedFieldsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/TimeBasedExtractedFieldsTests.java @@ -3,12 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +package org.elasticsearch.xpack.ml.datafeed.extractor.fields; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; @@ -31,16 +29,16 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ExtractedFieldsTests extends ESTestCase { +public class TimeBasedExtractedFieldsTests extends ESTestCase { private ExtractedField timeField = ExtractedField.newTimeField("time", ExtractedField.ExtractionMethod.DOC_VALUE); public void testInvalidConstruction() { - expectThrows(IllegalArgumentException.class, () -> new ExtractedFields(timeField, Collections.emptyList())); + expectThrows(IllegalArgumentException.class, () -> new TimeBasedExtractedFields(timeField, Collections.emptyList())); } public void testTimeFieldOnly() { - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField)); assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField))); assertThat(extractedFields.timeField(), equalTo("time")); @@ -56,7 +54,7 @@ public class ExtractedFieldsTests extends ESTestCase { ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD); ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE); ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField, docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2)); assertThat(extractedFields.getAllFields().size(), equalTo(7)); @@ -67,31 +65,31 @@ public class ExtractedFieldsTests extends ESTestCase { } public void testTimeFieldValue() { - final long millis = randomLong(); - final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", new DateTime(millis)).build(); - final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); + long millis = randomLong(); + SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", new DateTime(millis)).build(); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); } public void testStringTimeFieldValue() { - final long millis = randomLong(); - final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build(); - final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); + long millis = randomLong(); + SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build(); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); } public void testPre6xTimeFieldValue() { // Prior to 6.x, timestamps were simply `long` milliseconds-past-the-epoch values - final long millis = randomLong(); - final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", millis).build(); - final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); + long millis = randomLong(); + SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", millis).build(); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); } public void testTimeFieldValueGivenEmptyArray() { SearchHit hit = new SearchHitBuilder(1).addField("time", Collections.emptyList()).build(); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); } @@ -99,7 +97,7 @@ public class ExtractedFieldsTests extends ESTestCase { public void testTimeFieldValueGivenValueHasTwoElements() { SearchHit hit = new SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build(); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); } @@ -107,7 +105,7 @@ public class ExtractedFieldsTests extends ESTestCase { public void testTimeFieldValueGivenValueIsString() { SearchHit hit = new SearchHitBuilder(1).addField("time", "a string").build(); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); } @@ -137,7 +135,7 @@ public class ExtractedFieldsTests extends ESTestCase { when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps); when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps); - ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), + TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse); assertThat(extractedFields.timeField(), equalTo("time")); @@ -175,7 +173,7 @@ public class ExtractedFieldsTests extends ESTestCase { when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text); when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword); - ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), + TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse); assertThat(extractedFields.timeField(), equalTo("time")); @@ -209,10 +207,9 @@ public class ExtractedFieldsTests extends ESTestCase { FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable")); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); + assertThat(e.getMessage(), equalTo("cannot retrieve time field [time] because it is not aggregatable")); } public void testBuildGivenTimeFieldIsNotAggregatableInSomeIndices() { @@ -231,10 +228,9 @@ public class ExtractedFieldsTests extends ESTestCase { FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable")); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); + assertThat(e.getMessage(), equalTo("cannot retrieve time field [time] because it is not aggregatable")); } public void testBuildGivenFieldWithoutMappings() { @@ -252,10 +248,9 @@ public class ExtractedFieldsTests extends ESTestCase { FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve field [value] because it has no mappings")); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); + assertThat(e.getMessage(), equalTo("cannot retrieve field [value] because it has no mappings")); } private static FieldCapabilities createFieldCaps(boolean isAggregatable) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index f72ae9b46b1..93a76c5402b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -27,6 +27,8 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -61,7 +63,7 @@ public class ScrollDataExtractorTests extends ESTestCase { private List capturedContinueScrollIds; private ArgumentCaptor capturedClearScrollRequests; private String jobId; - private ExtractedFields extractedFields; + private TimeBasedExtractedFields extractedFields; private List types; private List indices; private QueryBuilder query; @@ -128,7 +130,7 @@ public class ScrollDataExtractorTests extends ESTestCase { capturedContinueScrollIds = new ArrayList<>(); jobId = "test-job"; ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); - extractedFields = new ExtractedFields(timeField, + extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE))); indices = Arrays.asList("index-1", "index-2"); types = Arrays.asList("type-1", "type-2"); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessorTests.java index 60e14023d36..41a74814461 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessorTests.java @@ -7,6 +7,9 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import java.io.ByteArrayOutputStream; @@ -23,7 +26,8 @@ public class SearchHitToJsonProcessorTests extends ESTestCase { ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, + Arrays.asList(timeField, missingField, singleField, arrayField)); SearchHit hit = new SearchHitBuilder(8) .addField("time", 1000L) @@ -41,7 +45,8 @@ public class SearchHitToJsonProcessorTests extends ESTestCase { ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); - ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); + TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, + Arrays.asList(timeField, missingField, singleField, arrayField)); SearchHit hit1 = new SearchHitBuilder(8) .addField("time", 1000L) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index d216ecfe13e..8e4e0c062b2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -229,7 +229,7 @@ setup: job_id: "start-stop-datafeed-job-field-without-mappings" - do: - catch: /datafeed \[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/ + catch: /\[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/ xpack.ml.start_datafeed: datafeed_id: "start-stop-datafeed-job-field-without-mappings-feed"