[ML] Enable reusing field extraction logic when no time field is required (#35100)

This commit is contained in:
Dimitris Athanasiou 2018-10-31 10:55:11 +00:00 committed by GitHub
parent bf6f521827
commit 00dc2ba36f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 262 additions and 103 deletions

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -18,7 +18,7 @@ import java.util.Objects;
* Represents a field to be extracted by the datafeed. * Represents a field to be extracted by the datafeed.
* It encapsulates the extraction logic. * It encapsulates the extraction logic.
*/ */
abstract class ExtractedField { public abstract class ExtractedField {
public enum ExtractionMethod { public enum ExtractionMethod {
SOURCE, DOC_VALUE, SCRIPT_FIELD SOURCE, DOC_VALUE, SCRIPT_FIELD

View File

@ -3,18 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import java.util.ArrayList; import java.util.Collection;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -25,20 +20,15 @@ import java.util.stream.Collectors;
/** /**
* The fields the datafeed has to extract * The fields the datafeed has to extract
*/ */
class ExtractedFields { public class ExtractedFields {
private static final String TEXT = "text"; private static final String TEXT = "text";
private final ExtractedField timeField;
private final List<ExtractedField> allFields; private final List<ExtractedField> allFields;
private final List<ExtractedField> docValueFields; private final List<ExtractedField> docValueFields;
private final String[] sourceFields; private final String[] sourceFields;
ExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) { public ExtractedFields(List<ExtractedField> allFields) {
if (!allFields.contains(timeField)) {
throw new IllegalArgumentException("timeField should also be contained in allFields");
}
this.timeField = Objects.requireNonNull(timeField);
this.allFields = Collections.unmodifiableList(allFields); this.allFields = Collections.unmodifiableList(allFields);
this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields); this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields);
this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields).stream().map(ExtractedField::getName) this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields).stream().map(ExtractedField::getName)
@ -61,60 +51,33 @@ class ExtractedFields {
return fields.stream().filter(field -> field.getExtractionMethod() == method).collect(Collectors.toList()); return fields.stream().filter(field -> field.getExtractionMethod() == method).collect(Collectors.toList());
} }
public String timeField() { public static ExtractedFields build(Collection<String> allFields, Set<String> scriptFields,
return timeField.getName(); FieldCapabilitiesResponse fieldsCapabilities) {
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
return new ExtractedFields(allFields.stream().map(field -> extractionMethodDetector.detect(field)).collect(Collectors.toList()));
} }
public Long timeFieldValue(SearchHit hit) { protected static class ExtractionMethodDetector {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
}
public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(datafeed.getId(), scriptFields,
fieldsCapabilities);
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField
+ "] because it is not aggregatable");
}
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
return new ExtractedFields(timeExtractedField, allExtractedFields);
}
private static class ExtractionMethodDetector {
private final String datafeedId;
private final Set<String> scriptFields; private final Set<String> scriptFields;
private final FieldCapabilitiesResponse fieldsCapabilities; private final FieldCapabilitiesResponse fieldsCapabilities;
private ExtractionMethodDetector(String datafeedId, Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) { protected ExtractionMethodDetector(Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
this.datafeedId = datafeedId;
this.scriptFields = scriptFields; this.scriptFields = scriptFields;
this.fieldsCapabilities = fieldsCapabilities; this.fieldsCapabilities = fieldsCapabilities;
} }
private ExtractedField detect(String field) { protected ExtractedField detect(String field) {
String internalField = field; String internalField = field;
ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE; ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE;
if (scriptFields.contains(field)) { if (scriptFields.contains(field)) {
method = ExtractedField.ExtractionMethod.SCRIPT_FIELD; method = ExtractedField.ExtractionMethod.SCRIPT_FIELD;
} else if (isAggregatable(field)) { } else if (isAggregatable(field)) {
method = ExtractedField.ExtractionMethod.DOC_VALUE; method = ExtractedField.ExtractionMethod.DOC_VALUE;
} else if (isText(field)) { if (isFieldOfType(field, "date")) {
return ExtractedField.newTimeField(field, method);
}
} else if (isFieldOfType(field, TEXT)) {
String parentField = MlStrings.getParentField(field); String parentField = MlStrings.getParentField(field);
// Field is text so check if it is a multi-field // Field is text so check if it is a multi-field
if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) { if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) {
@ -127,11 +90,10 @@ class ExtractedFields {
return ExtractedField.newField(field, internalField, method); return ExtractedField.newField(field, internalField, method);
} }
private boolean isAggregatable(String field) { protected boolean isAggregatable(String field) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field); Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps == null || fieldCaps.isEmpty()) { if (fieldCaps == null || fieldCaps.isEmpty()) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field throw new IllegalArgumentException("cannot retrieve field [" + field + "] because it has no mappings");
+ "] because it has no mappings");
} }
for (FieldCapabilities capsPerIndex : fieldCaps.values()) { for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
if (!capsPerIndex.isAggregatable()) { if (!capsPerIndex.isAggregatable()) {
@ -141,10 +103,10 @@ class ExtractedFields {
return true; return true;
} }
private boolean isText(String field) { private boolean isFieldOfType(String field, String type) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field); Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps != null && fieldCaps.size() == 1) { if (fieldCaps != null && fieldCaps.size() == 1) {
return fieldCaps.containsKey(TEXT); return fieldCaps.containsKey(type);
} }
return false; return false;
} }

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The fields to extract for a datafeed that requires a time field
*/
public class TimeBasedExtractedFields extends ExtractedFields {
private final ExtractedField timeField;
public TimeBasedExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
super(allFields);
if (!allFields.contains(timeField)) {
throw new IllegalArgumentException("timeField should also be contained in allFields");
}
this.timeField = Objects.requireNonNull(timeField);
}
public String timeField() {
return timeField.getName();
}
public Long timeFieldValue(SearchHit hit) {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
}
public static TimeBasedExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
throw new IllegalArgumentException("cannot retrieve time field [" + timeField + "] because it is not aggregatable");
}
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -15,7 +16,7 @@ import java.util.Objects;
class ScrollDataExtractorContext { class ScrollDataExtractorContext {
final String jobId; final String jobId;
final ExtractedFields extractedFields; final TimeBasedExtractedFields extractedFields;
final String[] indices; final String[] indices;
final String[] types; final String[] types;
final QueryBuilder query; final QueryBuilder query;
@ -25,7 +26,7 @@ class ScrollDataExtractorContext {
final long end; final long end;
final Map<String, String> headers; final Map<String, String> headers;
ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indices, List<String> types, ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List<String> indices, List<String> types,
QueryBuilder query, List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize, QueryBuilder query, List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize,
long start, long end, Map<String, String> headers) { long start, long end, Map<String, String> headers) {
this.jobId = Objects.requireNonNull(jobId); this.jobId = Objects.requireNonNull(jobId);

View File

@ -16,8 +16,10 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import java.util.Objects; import java.util.Objects;
@ -26,9 +28,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Client client; private final Client client;
private final DatafeedConfig datafeedConfig; private final DatafeedConfig datafeedConfig;
private final Job job; private final Job job;
private final ExtractedFields extractedFields; private final TimeBasedExtractedFields extractedFields;
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, ExtractedFields extractedFields) { private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, TimeBasedExtractedFields extractedFields) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig); this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job); this.job = Objects.requireNonNull(job);
@ -56,12 +58,14 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
// Step 2. Contruct the factory and notify listener // Step 2. Contruct the factory and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap( ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> { fieldCapabilitiesResponse -> {
ExtractedFields extractedFields = ExtractedFields.build(job, datafeed, fieldCapabilitiesResponse); TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields)); listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields));
}, e -> { }, e -> {
if (e instanceof IndexNotFoundException) { if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId() listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist")); + "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else { } else {
listener.onFailure(e); listener.onFailure(e);
} }

View File

@ -9,6 +9,8 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;

View File

@ -3,11 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExtractedFieldsTests extends ESTestCase {
public void testAllTypesOfFields() {
ExtractedField docValue1 = ExtractedField.newField("doc1", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField docValue2 = ExtractedField.newField("doc2", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField scriptField1 = ExtractedField.newField("scripted1", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE);
ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE);
ExtractedFields extractedFields = new ExtractedFields(Arrays.asList(
docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2));
assertThat(extractedFields.getAllFields().size(), equalTo(6));
assertThat(extractedFields.getDocValueFields().stream().map(ExtractedField::getName).toArray(String[]::new),
equalTo(new String[] {"doc1", "doc2"}));
assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"src1", "src2"}));
}
public void testBuildGivenMixtureOfTypes() {
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(true));
Map<String, FieldCapabilities> valueCaps = new HashMap<>();
valueCaps.put("float", createFieldCaps(true));
valueCaps.put("keyword", createFieldCaps(true));
Map<String, FieldCapabilities> airlineCaps = new HashMap<>();
airlineCaps.put("text", createFieldCaps(false));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps);
when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps);
ExtractedFields extractedFields = ExtractedFields.build(Arrays.asList("time", "value", "airline", "airport"),
new HashSet<>(Collections.singletonList("airport")), fieldCapabilitiesResponse);
assertThat(extractedFields.getDocValueFields().size(), equalTo(2));
assertThat(extractedFields.getDocValueFields().get(0).getName(), equalTo("time"));
assertThat(extractedFields.getDocValueFields().get(0).getDocValueFormat(), equalTo("epoch_millis"));
assertThat(extractedFields.getDocValueFields().get(1).getName(), equalTo("value"));
assertThat(extractedFields.getDocValueFields().get(1).getDocValueFormat(), equalTo(DocValueFieldsContext.USE_DEFAULT_FORMAT));
assertThat(extractedFields.getSourceFields(), equalTo(new String[] {"airline"}));
assertThat(extractedFields.getAllFields().size(), equalTo(4));
}
public void testBuildGivenMultiFields() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("count", null);
detector.setByFieldName("airline.text");
detector.setOverFieldName("airport.keyword");
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
Map<String, FieldCapabilities> text = new HashMap<>();
text.put("text", createFieldCaps(false));
Map<String, FieldCapabilities> keyword = new HashMap<>();
keyword.put("keyword", createFieldCaps(true));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("airline")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airline.text")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword);
ExtractedFields extractedFields = ExtractedFields.build(Arrays.asList("airline.text", "airport.keyword"),
Collections.emptySet(), fieldCapabilitiesResponse);
assertThat(extractedFields.getDocValueFields().size(), equalTo(1));
assertThat(extractedFields.getDocValueFields().get(0).getName(), equalTo("airport.keyword"));
assertThat(extractedFields.getSourceFields().length, equalTo(1));
assertThat(extractedFields.getSourceFields()[0], equalTo("airline"));
assertThat(extractedFields.getAllFields().size(), equalTo(2));
assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airport.keyword")).findFirst().get().getAlias(),
equalTo("airport.keyword"));
assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airline")).findFirst().get().getAlias(),
equalTo("airline.text"));
}
public void testBuildGivenFieldWithoutMappings() {
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> ExtractedFields.build(
Collections.singletonList("value"), Collections.emptySet(), fieldCapabilitiesResponse));
assertThat(e.getMessage(), equalTo("cannot retrieve field [value] because it has no mappings"));
}
private static FieldCapabilities createFieldCaps(boolean isAggregatable) {
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(isAggregatable);
return fieldCaps;
}
}

View File

@ -3,12 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext; import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
@ -31,16 +29,16 @@ import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ExtractedFieldsTests extends ESTestCase { public class TimeBasedExtractedFieldsTests extends ESTestCase {
private ExtractedField timeField = ExtractedField.newTimeField("time", ExtractedField.ExtractionMethod.DOC_VALUE); private ExtractedField timeField = ExtractedField.newTimeField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
public void testInvalidConstruction() { public void testInvalidConstruction() {
expectThrows(IllegalArgumentException.class, () -> new ExtractedFields(timeField, Collections.emptyList())); expectThrows(IllegalArgumentException.class, () -> new TimeBasedExtractedFields(timeField, Collections.emptyList()));
} }
public void testTimeFieldOnly() { public void testTimeFieldOnly() {
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField));
assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField))); assertThat(extractedFields.getAllFields(), equalTo(Arrays.asList(timeField)));
assertThat(extractedFields.timeField(), equalTo("time")); assertThat(extractedFields.timeField(), equalTo("time"));
@ -56,7 +54,7 @@ public class ExtractedFieldsTests extends ESTestCase {
ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD); ExtractedField scriptField2 = ExtractedField.newField("scripted2", ExtractedField.ExtractionMethod.SCRIPT_FIELD);
ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE); ExtractedField sourceField1 = ExtractedField.newField("src1", ExtractedField.ExtractionMethod.SOURCE);
ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE); ExtractedField sourceField2 = ExtractedField.newField("src2", ExtractedField.ExtractionMethod.SOURCE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField,
docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2)); docValue1, docValue2, scriptField1, scriptField2, sourceField1, sourceField2));
assertThat(extractedFields.getAllFields().size(), equalTo(7)); assertThat(extractedFields.getAllFields().size(), equalTo(7));
@ -67,31 +65,31 @@ public class ExtractedFieldsTests extends ESTestCase {
} }
public void testTimeFieldValue() { public void testTimeFieldValue() {
final long millis = randomLong(); long millis = randomLong();
final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", new DateTime(millis)).build(); SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", new DateTime(millis)).build();
final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField));
assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis));
} }
public void testStringTimeFieldValue() { public void testStringTimeFieldValue() {
final long millis = randomLong(); long millis = randomLong();
final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build(); SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build();
final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField));
assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis));
} }
public void testPre6xTimeFieldValue() { public void testPre6xTimeFieldValue() {
// Prior to 6.x, timestamps were simply `long` milliseconds-past-the-epoch values // Prior to 6.x, timestamps were simply `long` milliseconds-past-the-epoch values
final long millis = randomLong(); long millis = randomLong();
final SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", millis).build(); SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", millis).build();
final ExtractedFields extractedFields = new ExtractedFields(timeField, Collections.singletonList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Collections.singletonList(timeField));
assertThat(extractedFields.timeFieldValue(hit), equalTo(millis)); assertThat(extractedFields.timeFieldValue(hit), equalTo(millis));
} }
public void testTimeFieldValueGivenEmptyArray() { public void testTimeFieldValueGivenEmptyArray() {
SearchHit hit = new SearchHitBuilder(1).addField("time", Collections.emptyList()).build(); SearchHit hit = new SearchHitBuilder(1).addField("time", Collections.emptyList()).build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
} }
@ -99,7 +97,7 @@ public class ExtractedFieldsTests extends ESTestCase {
public void testTimeFieldValueGivenValueHasTwoElements() { public void testTimeFieldValueGivenValueHasTwoElements() {
SearchHit hit = new SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build(); SearchHit hit = new SearchHitBuilder(1).addField("time", Arrays.asList(1L, 2L)).build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
} }
@ -107,7 +105,7 @@ public class ExtractedFieldsTests extends ESTestCase {
public void testTimeFieldValueGivenValueIsString() { public void testTimeFieldValueGivenValueIsString() {
SearchHit hit = new SearchHitBuilder(1).addField("time", "a string").build(); SearchHit hit = new SearchHitBuilder(1).addField("time", "a string").build();
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField, Arrays.asList(timeField));
expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit)); expectThrows(RuntimeException.class, () -> extractedFields.timeFieldValue(hit));
} }
@ -137,7 +135,7 @@ public class ExtractedFieldsTests extends ESTestCase {
when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps); when(fieldCapabilitiesResponse.getField("value")).thenReturn(valueCaps);
when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps); when(fieldCapabilitiesResponse.getField("airline")).thenReturn(airlineCaps);
ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(),
fieldCapabilitiesResponse); fieldCapabilitiesResponse);
assertThat(extractedFields.timeField(), equalTo("time")); assertThat(extractedFields.timeField(), equalTo("time"));
@ -175,7 +173,7 @@ public class ExtractedFieldsTests extends ESTestCase {
when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text); when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword); when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword);
ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(),
fieldCapabilitiesResponse); fieldCapabilitiesResponse);
assertThat(extractedFields.timeField(), equalTo("time")); assertThat(extractedFields.timeField(), equalTo("time"));
@ -209,10 +207,9 @@ public class ExtractedFieldsTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("cannot retrieve time field [time] because it is not aggregatable"));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable"));
} }
public void testBuildGivenTimeFieldIsNotAggregatableInSomeIndices() { public void testBuildGivenTimeFieldIsNotAggregatableInSomeIndices() {
@ -231,10 +228,9 @@ public class ExtractedFieldsTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("cannot retrieve time field [time] because it is not aggregatable"));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve time field [time] because it is not aggregatable"));
} }
public void testBuildGivenFieldWithoutMappings() { public void testBuildGivenFieldWithoutMappings() {
@ -252,10 +248,9 @@ public class ExtractedFieldsTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps); when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse)); () -> TimeBasedExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(), fieldCapabilitiesResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("cannot retrieve field [value] because it has no mappings"));
assertThat(e.getMessage(), equalTo("datafeed [feed] cannot retrieve field [value] because it has no mappings"));
} }
private static FieldCapabilities createFieldCaps(boolean isAggregatable) { private static FieldCapabilities createFieldCaps(boolean isAggregatable) {

View File

@ -27,6 +27,8 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -61,7 +63,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
private List<String> capturedContinueScrollIds; private List<String> capturedContinueScrollIds;
private ArgumentCaptor<ClearScrollRequest> capturedClearScrollRequests; private ArgumentCaptor<ClearScrollRequest> capturedClearScrollRequests;
private String jobId; private String jobId;
private ExtractedFields extractedFields; private TimeBasedExtractedFields extractedFields;
private List<String> types; private List<String> types;
private List<String> indices; private List<String> indices;
private QueryBuilder query; private QueryBuilder query;
@ -128,7 +130,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
capturedContinueScrollIds = new ArrayList<>(); capturedContinueScrollIds = new ArrayList<>();
jobId = "test-job"; jobId = "test-job";
ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField timeField = ExtractedField.newField("time", ExtractedField.ExtractionMethod.DOC_VALUE);
extractedFields = new ExtractedFields(timeField, extractedFields = new TimeBasedExtractedFields(timeField,
Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE))); Arrays.asList(timeField, ExtractedField.newField("field_1", ExtractedField.ExtractionMethod.DOC_VALUE)));
indices = Arrays.asList("index-1", "index-2"); indices = Arrays.asList("index-1", "index-2");
types = Arrays.asList("type-1", "type-2"); types = Arrays.asList("type-1", "type-2");

View File

@ -7,6 +7,9 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -23,7 +26,8 @@ public class SearchHitToJsonProcessorTests extends ESTestCase {
ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField,
Arrays.asList(timeField, missingField, singleField, arrayField));
SearchHit hit = new SearchHitBuilder(8) SearchHit hit = new SearchHitBuilder(8)
.addField("time", 1000L) .addField("time", 1000L)
@ -41,7 +45,8 @@ public class SearchHitToJsonProcessorTests extends ESTestCase {
ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField missingField = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField singleField = ExtractedField.newField("single", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE); ExtractedField arrayField = ExtractedField.newField("array", ExtractedField.ExtractionMethod.DOC_VALUE);
ExtractedFields extractedFields = new ExtractedFields(timeField, Arrays.asList(timeField, missingField, singleField, arrayField)); TimeBasedExtractedFields extractedFields = new TimeBasedExtractedFields(timeField,
Arrays.asList(timeField, missingField, singleField, arrayField));
SearchHit hit1 = new SearchHitBuilder(8) SearchHit hit1 = new SearchHitBuilder(8)
.addField("time", 1000L) .addField("time", 1000L)

View File

@ -229,7 +229,7 @@ setup:
job_id: "start-stop-datafeed-job-field-without-mappings" job_id: "start-stop-datafeed-job-field-without-mappings"
- do: - do:
catch: /datafeed \[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/ catch: /\[start-stop-datafeed-job-field-without-mappings-feed] cannot retrieve field \[airline2\] because it has no mappings/
xpack.ml.start_datafeed: xpack.ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-field-without-mappings-feed" datafeed_id: "start-stop-datafeed-job-field-without-mappings-feed"