[ML] Extract parent field when job has text multi-field (elastic/x-pack-elasticsearch#1705)

In the case where a field is a text multi-field, it has
no doc values and it is not in source. Thus, the datafeed
will not be able to extract it.

However, it is possible to extract it by getting its parent
field instead. This commit implements the logic to look
in parent fields when the field in question is a text field.

Original commit: elastic/x-pack-elasticsearch@f116e89921
This commit is contained in:
Dimitris Athanasiou 2017-06-13 18:00:24 +01:00 committed by GitHub
parent fff33e753a
commit f2e2ccae01
9 changed files with 218 additions and 40 deletions

View File

@ -13,20 +13,34 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Represents a field to be extracted by the datafeed.
* It encapsulates the extraction logic.
*/
abstract class ExtractedField {
public enum ExtractionMethod {
SOURCE, DOC_VALUE, SCRIPT_FIELD
}
/** The name of the field as configured in the job */
protected final String alias;
/** The name of the field we extract */
protected final String name;
private final ExtractionMethod extractionMethod;
protected ExtractedField(String name, ExtractionMethod extractionMethod) {
protected ExtractedField(String alias, String name, ExtractionMethod extractionMethod) {
this.alias = Objects.requireNonNull(alias);
this.name = Objects.requireNonNull(name);
this.extractionMethod = Objects.requireNonNull(extractionMethod);
}
public String getAlias() {
return alias;
}
public String getName() {
return name;
}
@ -45,12 +59,16 @@ abstract class ExtractedField {
}
public static ExtractedField newField(String name, ExtractionMethod extractionMethod) {
return newField(name, name, extractionMethod);
}
public static ExtractedField newField(String alias, String name, ExtractionMethod extractionMethod) {
switch (extractionMethod) {
case DOC_VALUE:
case SCRIPT_FIELD:
return new FromFields(name, extractionMethod);
return new FromFields(alias, name, extractionMethod);
case SOURCE:
return new FromSource(name, extractionMethod);
return new FromSource(alias, name, extractionMethod);
default:
throw new IllegalArgumentException("Invalid extraction method [" + extractionMethod + "]");
}
@ -58,8 +76,8 @@ abstract class ExtractedField {
private static class FromFields extends ExtractedField {
FromFields(String name, ExtractionMethod extractionMethod) {
super(name, extractionMethod);
FromFields(String alias, String name, ExtractionMethod extractionMethod) {
super(alias, name, extractionMethod);
}
@Override
@ -76,7 +94,7 @@ abstract class ExtractedField {
private static class TimeField extends FromFields {
TimeField(String name, ExtractionMethod extractionMethod) {
super(name, extractionMethod);
super(name, name, extractionMethod);
}
@Override
@ -94,8 +112,8 @@ abstract class ExtractedField {
private String[] namePath;
FromSource(String name, ExtractionMethod extractionMethod) {
super(name, extractionMethod);
FromSource(String alias, String name, ExtractionMethod extractionMethod) {
super(alias, name, extractionMethod);
namePath = name.split("\\.");
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import java.util.ArrayList;
import java.util.Arrays;
@ -22,8 +23,13 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The fields the datafeed has to extract
*/
class ExtractedFields {
private static final String TEXT = "text";
private final ExtractedField timeField;
private final List<ExtractedField> allFields;
private final String[] docValueFields;
@ -68,19 +74,21 @@ class ExtractedFields {
public Long timeFieldValue(SearchHit hit) {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getName() + "] expected a single value; actual was: "
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getName() + "] expected a long value; actual was: " + value[0]);
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
}
public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(datafeed.getId(), scriptFields,
fieldsCapabilities);
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && isAggregatable(datafeed.getId(), timeField, fieldsCapabilities) == false) {
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField
+ "] because it is not aggregatable");
}
@ -90,27 +98,62 @@ class ExtractedFields {
f -> !(f.equals(timeField) || f.equals(AnalysisConfig.ML_CATEGORY_FIELD))).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
for (String field : remainingFields) {
ExtractedField.ExtractionMethod method = scriptFields.contains(field) ? ExtractedField.ExtractionMethod.SCRIPT_FIELD
: isAggregatable(datafeed.getId(), field, fieldsCapabilities) ? ExtractedField.ExtractionMethod.DOC_VALUE
: ExtractedField.ExtractionMethod.SOURCE;
allExtractedFields.add(ExtractedField.newField(field, method));
}
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
return new ExtractedFields(timeExtractedField, allExtractedFields);
}
private static boolean isAggregatable(String datafeedId, String field, FieldCapabilitiesResponse fieldsCapabilities) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps == null || fieldCaps.isEmpty()) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field
+ "] because it has no mappings");
private static class ExtractionMethodDetector {
private final String datafeedId;
private final Set<String> scriptFields;
private final FieldCapabilitiesResponse fieldsCapabilities;
private ExtractionMethodDetector(String datafeedId, Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
this.datafeedId = datafeedId;
this.scriptFields = scriptFields;
this.fieldsCapabilities = fieldsCapabilities;
}
for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
if (!capsPerIndex.isAggregatable()) {
return false;
private ExtractedField detect(String field) {
String internalField = field;
ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE;
if (scriptFields.contains(field)) {
method = ExtractedField.ExtractionMethod.SCRIPT_FIELD;
} else if (isAggregatable(field)) {
method = ExtractedField.ExtractionMethod.DOC_VALUE;
} else if (isText(field)) {
String parentField = MlStrings.getParentField(field);
// Field is text so check if it is a multi-field
if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) {
// Field is a multi-field which means it won't be available in source. Let's take the parent instead.
internalField = parentField;
method = isAggregatable(parentField) ? ExtractedField.ExtractionMethod.DOC_VALUE
: ExtractedField.ExtractionMethod.SOURCE;
}
}
return ExtractedField.newField(field, internalField, method);
}
private boolean isAggregatable(String field) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps == null || fieldCaps.isEmpty()) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field
+ "] because it has no mappings");
}
for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
if (!capsPerIndex.isAggregatable()) {
return false;
}
}
return true;
}
private boolean isText(String field) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps != null && fieldCaps.size() == 1) {
return fieldCaps.containsKey(TEXT);
}
return false;
}
return true;
}
}

View File

@ -17,7 +17,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.MlStrings;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -70,8 +72,10 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
// Step 1. Get field capabilities necessary to build the information of how to extract fields
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()]));
List<String> fields = job.allFields();
fieldCapabilitiesRequest.fields(fields.toArray(new String[fields.size()]));
// We need capabilities for all fields matching the requested fields' parents so that we can work around
// multi-fields that are not in source.
String[] requestFields = job.allFields().stream().map(f -> MlStrings.getParentField(f) + "*").toArray(size -> new String[size]);
fieldCapabilitiesRequest.fields(requestFields);
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
}
}

View File

@ -27,7 +27,7 @@ class SearchHitToJsonProcessor implements Releasable {
public void process(SearchHit hit) throws IOException {
jsonBuilder.startObject();
for (ExtractedField field : fields.getAllFields()) {
writeKeyValue(field.getName(), field.value(hit));
writeKeyValue(field.getAlias(), field.value(hit));
}
jsonBuilder.endObject();
}

View File

@ -60,4 +60,23 @@ public final class MlStrings {
public static boolean isValidId(String id) {
return id != null && VALID_ID_CHAR_PATTERN.matcher(id).matches() && !Job.ALL.equals(id);
}
/**
* Returns the path to the parent field if {@code fieldPath} is nested
* or {@code fieldPath} itself.
*
* @param fieldPath a field path
* @return the path to the parent field if {code fieldPath} is nested
* or {@code} fieldPath itself
*/
public static String getParentField(String fieldPath) {
if (fieldPath == null) {
return fieldPath;
}
int lastIndexOfDot = fieldPath.lastIndexOf('.');
if (lastIndexOfDot < 0) {
return fieldPath;
}
return fieldPath.substring(0, lastIndexOfDot);
}
}

View File

@ -58,7 +58,7 @@ public class ExtractedFieldTests extends ESTestCase {
public void testValueGivenNestedSource() {
SearchHit hit = new SearchHitBuilder(42).setSource("{\"level_1\":{\"level_2\":{\"foo\":\"bar\"}}}").build();
ExtractedField nested = ExtractedField.newField("level_1.level_2.foo", ExtractedField.ExtractionMethod.SOURCE);
ExtractedField nested = ExtractedField.newField("alias", "level_1.level_2.foo", ExtractedField.ExtractionMethod.SOURCE);
assertThat(nested.value(hit), equalTo(new String[] { "bar" }));
}
@ -102,4 +102,20 @@ public class ExtractedFieldTests extends ESTestCase {
assertThat(timeField.value(hit), equalTo(new Object[] { 123456789L }));
}
public void testAliasVersusName() {
SearchHit hit = new SearchHitBuilder(42).addField("a", 1).addField("b", 2).build();
ExtractedField field = ExtractedField.newField("a", "a", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(field.getAlias(), equalTo("a"));
assertThat(field.getName(), equalTo("a"));
assertThat(field.value(hit), equalTo(new Integer[] { 1 }));
hit = new SearchHitBuilder(42).addField("a", 1).addField("b", 2).build();
field = ExtractedField.newField("a", "b", ExtractedField.ExtractionMethod.DOC_VALUE);
assertThat(field.getAlias(), equalTo("a"));
assertThat(field.getName(), equalTo("b"));
assertThat(field.value(hit), equalTo(new Integer[] { 2 }));
}
}

View File

@ -132,6 +132,49 @@ public class ExtractedFieldsTests extends ESTestCase {
assertThat(extractedFields.getAllFields().size(), equalTo(4));
}
public void testBuildGivenMultiFields() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());
Detector.Builder detector = new Detector.Builder("count", null);
detector.setByFieldName("airline.text");
detector.setOverFieldName("airport.keyword");
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector.build())));
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", jobBuilder.getId());
datafeedBuilder.setIndices(Collections.singletonList("foo"));
Map<String, FieldCapabilities> timeCaps = new HashMap<>();
timeCaps.put("date", createFieldCaps(true));
Map<String, FieldCapabilities> text = new HashMap<>();
text.put("text", createFieldCaps(false));
Map<String, FieldCapabilities> keyword = new HashMap<>();
keyword.put("keyword", createFieldCaps(true));
FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
when(fieldCapabilitiesResponse.getField("time")).thenReturn(timeCaps);
when(fieldCapabilitiesResponse.getField("airline")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airline.text")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airport")).thenReturn(text);
when(fieldCapabilitiesResponse.getField("airport.keyword")).thenReturn(keyword);
ExtractedFields extractedFields = ExtractedFields.build(jobBuilder.build(new Date()), datafeedBuilder.build(),
fieldCapabilitiesResponse);
assertThat(extractedFields.timeField(), equalTo("time"));
assertThat(extractedFields.getDocValueFields().length, equalTo(2));
assertThat(extractedFields.getDocValueFields()[0], equalTo("time"));
assertThat(extractedFields.getDocValueFields()[1], equalTo("airport.keyword"));
assertThat(extractedFields.getSourceFields().length, equalTo(1));
assertThat(extractedFields.getSourceFields()[0], equalTo("airline"));
assertThat(extractedFields.getAllFields().size(), equalTo(3));
assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("time")).findFirst().get().getAlias(),
equalTo("time"));
assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airport.keyword")).findFirst().get().getAlias(),
equalTo("airport.keyword"));
assertThat(extractedFields.getAllFields().stream().filter(f -> f.getName().equals("airline")).findFirst().get().getAlias(),
equalTo("airline.text"));
}
public void testBuildGivenTimeFieldIsNotAggregatable() {
Job.Builder jobBuilder = new Job.Builder("foo");
jobBuilder.setDataDescription(new DataDescription.Builder());

View File

@ -76,13 +76,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
client().performRequest("put", "airline-data-empty", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
// Create index with source = enabled, doc_values = enabled, stored = false
// Create index with source = enabled, doc_values = enabled, stored = false + multi-field
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"airline\": {"
+ " \"type\":\"text\","
+ " \"fields\":{"
+ " \"text\":{\"type\":\"text\"},"
+ " \"keyword\":{\"type\":\"keyword\"}"
+ " }"
+ " },"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
@ -205,8 +211,19 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
client().performRequest("post", "_refresh");
}
public void testLookbackOnly() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only", "airline-data").setShouldSucceedProcessing(true).execute();
public void testLookbackOnlyWithMixedTypes() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-with-mixed-types", "airline-data")
.setShouldSucceedProcessing(true).execute();
}
public void testLookbackOnlyWithKeywordMultiField() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-with-keyword-multi-field", "airline-data")
.setAirlineVariant("airline.keyword").setShouldSucceedProcessing(true).execute();
}
public void testLookbackOnlyWithTextMultiField() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-with-keyword-multi-field", "airline-data")
.setAirlineVariant("airline.text").setShouldSucceedProcessing(true).execute();
}
public void testLookbackOnlyWithDocValuesDisabled() throws Exception {
@ -358,7 +375,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId);
createJob(jobId, "airline");
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
openJob(client(), jobId);
@ -403,7 +420,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testForceDeleteWhileDatafeedIsRunning() throws Exception {
String jobId = "job-realtime-2";
createJob(jobId);
createJob(jobId, "airline");
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
openJob(client(), jobId);
@ -431,6 +448,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
private class LookbackOnlyTestHelper {
private String jobId;
private String airlineVariant;
private String dataIndex;
private boolean addScriptedFields;
private boolean shouldSucceedInput;
@ -441,6 +459,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
this.dataIndex = dataIndex;
this.shouldSucceedInput = true;
this.shouldSucceedProcessing = true;
this.airlineVariant = "airline";
}
public LookbackOnlyTestHelper setAddScriptedFields(boolean value) {
@ -448,6 +467,12 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
return this;
}
public LookbackOnlyTestHelper setAirlineVariant(String airlineVariant) {
this.airlineVariant = airlineVariant;
return this;
}
public LookbackOnlyTestHelper setShouldSucceedInput(boolean value) {
shouldSucceedInput = value;
return this;
@ -459,7 +484,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
public void execute() throws Exception {
createJob(jobId);
createJob(jobId, airlineVariant);
String datafeedId = "datafeed-" + jobId;
new DatafeedBuilder(datafeedId, jobId, dataIndex, "response")
.setScriptedFields(addScriptedFields ?
@ -515,10 +540,11 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
});
}
private Response createJob(String id) throws Exception {
private Response createJob(String id, String airlineVariant) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":\"1h\",\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " \"detectors\" :[\n"
+ " {\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"" + airlineVariant + "\"}]\n"
+ " },\n" + " \"data_description\" : {\n"
+ " \"format\":\"xcontent\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"

View File

@ -8,7 +8,9 @@ package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class MlStringsTests extends ESTestCase {
public void testDoubleQuoteIfNotAlphaNumeric() {
@ -31,4 +33,11 @@ public class MlStringsTests extends ESTestCase {
assertThat(MlStrings.isValidId("!afafd"), is(false));
assertThat(MlStrings.isValidId("_all"), is(false));
}
public void testGetParentField() {
assertThat(MlStrings.getParentField(null), is(nullValue()));
assertThat(MlStrings.getParentField("foo"), equalTo("foo"));
assertThat(MlStrings.getParentField("foo.bar"), equalTo("foo"));
assertThat(MlStrings.getParentField("x.y.z"), equalTo("x.y"));
}
}