From 4d6e037e9053baae71b3eaa40321c02759d4b6d8 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 20 Nov 2019 10:02:42 +0200 Subject: [PATCH] [7.x][ML] Extract creation of DFA field extractor into a factory (#49315) (#49329) This commit moves the async calls required to retrieve the components that make up `ExtractedFieldsExtractor` out of `DataFrameDataExtractorFactory` and into a dedicated `ExtractorFieldsExtractorFactory` class. A few more refactorings are performed: - The detector no longer needs the results field. Instead, it knows whether to use it or not based on whether the task is restarting. - We pass more accurately whether the task is restarting or not. - The validation of whether fields that have a cardinality limit are valid is now performed in the detector after retrieving the respective cardinalities. Backport of #49315 --- .../TransportEstimateMemoryUsageAction.java | 1 + ...ransportStartDataFrameAnalyticsAction.java | 80 ++++--- .../DataFrameDataExtractorFactory.java | 223 ++---------------- .../extractor/ExtractedFieldsDetector.java | 39 ++- .../ExtractedFieldsDetectorFactory.java | 189 +++++++++++++++ .../ExtractedFieldsDetectorTests.java | 105 ++++----- 6 files changed, 339 insertions(+), 298 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorFactory.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java index 0ab00bb6071..a82db7c4f97 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java @@ -81,6 +81,7 @@ public class TransportEstimateMemoryUsageAction DataFrameDataExtractorFactory.createForSourceIndices( client, taskId, + true, // We are not interested in first-time run validations here request.getConfig(), ActionListener.wrap( dataExtractorFactory -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 8bfa584ac0c..1740a7fb532 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -64,6 +64,7 @@ import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.MappingsMerger; import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; +import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; import org.elasticsearch.xpack.ml.job.JobNodeSelector; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -228,33 +229,7 @@ public class TransportStartDataFrameAnalyticsAction // Step 6. Validate that there are analyzable data in the source index ActionListener validateMappingsMergeListener = ActionListener.wrap( - startContext -> DataFrameDataExtractorFactory.createForSourceIndices(client, - "validate_source_index_has_rows-" + id, - startContext.config, - ActionListener.wrap( - dataFrameDataExtractorFactory -> - dataFrameDataExtractorFactory - .newExtractor(false) - .collectDataSummaryAsync(ActionListener.wrap( - dataSummary -> { - if (dataSummary.rows == 0) { - finalListener.onFailure(ExceptionsHelper.badRequestException( - "Unable to start {} as no documents in the source indices [{}] contained all the fields " - + "selected for analysis. If you are relying on automatic field selection then there are " - + "currently mapped fields that do not exist in any indexed documents, and you will have " - + "to switch to explicit field selection and include only fields that exist in indexed " - + "documents.", - id, Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex()) - )); - } else { - finalListener.onResponse(startContext); - } - }, - finalListener::onFailure - )), - finalListener::onFailure - )) - , + startContext -> validateSourceIndexHasRows(startContext, finalListener), finalListener::onFailure ); @@ -269,9 +244,7 @@ public class TransportStartDataFrameAnalyticsAction // Step 4. Validate dest index is empty if task is starting for first time ActionListener toValidateDestEmptyListener = ActionListener.wrap( startContext -> { - DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState( - startContext.config.getId(), startContext.progressOnStart); - switch (startingState) { + switch (startContext.startingState) { case FIRST_TIME: checkDestIndexIsEmptyIfExists(startContext, toValidateMappingsListener); break; @@ -285,7 +258,7 @@ public class TransportStartDataFrameAnalyticsAction "Cannot start because the job has already finished")); break; default: - finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startingState)); + finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startContext.startingState)); break; } }, @@ -295,9 +268,16 @@ public class TransportStartDataFrameAnalyticsAction // Step 3. Validate source and dest; check data extraction is possible ActionListener startContextListener = ActionListener.wrap( startContext -> { + // Validate the query parses + startContext.config.getSource().getParsedQuery(); + + // Validate source/dest are valid new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(startContext.config); - DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, startContext.config, ActionListener.wrap( - config -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure)); + + // Validate extraction is possible + boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME; + new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, isTaskRestarting, ActionListener.wrap( + extractedFieldsDetector -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure)); }, finalListener::onFailure ); @@ -313,6 +293,38 @@ public class TransportStartDataFrameAnalyticsAction configProvider.get(id, getConfigListener); } + private void validateSourceIndexHasRows(StartContext startContext, ActionListener listener) { + boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME; + DataFrameDataExtractorFactory.createForSourceIndices(client, + "validate_source_index_has_rows-" + startContext.config.getId(), + isTaskRestarting, + startContext.config, + ActionListener.wrap( + dataFrameDataExtractorFactory -> + dataFrameDataExtractorFactory + .newExtractor(false) + .collectDataSummaryAsync(ActionListener.wrap( + dataSummary -> { + if (dataSummary.rows == 0) { + listener.onFailure(ExceptionsHelper.badRequestException( + "Unable to start {} as no documents in the source indices [{}] contained all the fields " + + "selected for analysis. If you are relying on automatic field selection then there are " + + "currently mapped fields that do not exist in any indexed documents, and you will have " + + "to switch to explicit field selection and include only fields that exist in indexed " + + "documents.", + startContext.config.getId(), + Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex()) + )); + } else { + listener.onResponse(startContext); + } + }, + listener::onFailure + )), + listener::onFailure + )); + } + private void getProgress(DataFrameAnalyticsConfig config, ActionListener> listener) { GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(config.getId()); executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap( @@ -389,10 +401,12 @@ public class TransportStartDataFrameAnalyticsAction private static class StartContext { private final DataFrameAnalyticsConfig config; private final List progressOnStart; + private final DataFrameAnalyticsTask.StartingState startingState; private StartContext(DataFrameAnalyticsConfig config, List progressOnStart) { this.config = config; this.progressOnStart = progressOnStart; + this.startingState = DataFrameAnalyticsTask.determineStartingState(config.getId(), progressOnStart); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index afa9e51b626..ce21973ca91 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -5,43 +5,20 @@ */ package org.elasticsearch.xpack.ml.dataframe.extractor; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.metrics.Cardinality; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.extractor.ExtractedField; import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import java.util.Arrays; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class DataFrameDataExtractorFactory { @@ -94,27 +71,27 @@ public class DataFrameDataExtractorFactory { * The source index must exist and contain at least 1 compatible field or validations will fail. * * @param client ES Client used to make calls against the cluster + * @param taskId The task id + * @param isTaskRestarting Whether the task is restarting or it is running for the first time * @param config The config from which to create the extractor factory * @param listener The listener to notify on creation or failure */ public static void createForSourceIndices(Client client, String taskId, + boolean isTaskRestarting, DataFrameAnalyticsConfig config, ActionListener listener) { - validateIndexAndExtractFields( - client, - config.getSource().getIndex(), - config, - null, - false, - ActionListener.wrap( - extractedFields -> listener.onResponse( - new DataFrameDataExtractorFactory( - client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(), - config.getAnalysis().supportsMissingValues())), - listener::onFailure - ) - ); + ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client); + extractedFieldsDetectorFactory.createFromSource(config, isTaskRestarting, ActionListener.wrap( + extractedFieldsDetector -> { + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, taskId, + Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(), + config.getAnalysis().supportsMissingValues()); + listener.onResponse(extractorFactory); + }, + listener::onFailure + )); } /** @@ -131,168 +108,16 @@ public class DataFrameDataExtractorFactory { DataFrameAnalyticsConfig config, boolean isTaskRestarting, ActionListener listener) { - validateIndexAndExtractFields( - client, - new String[] {config.getDest().getIndex()}, - config, - config.getDest().getResultsField(), - isTaskRestarting, - ActionListener.wrap( - extractedFields -> listener.onResponse( - new DataFrameDataExtractorFactory( - client, config.getId(), Arrays.asList(config.getDest().getIndex()), extractedFields, config.getHeaders(), - config.getAnalysis().supportsMissingValues())), - listener::onFailure - ) - ); - } - - /** - * Validates the source index and analytics config - * - * @param client ES Client to make calls - * @param config Analytics config to validate - * @param listener The listener to notify on failure or completion - */ - public static void validateConfigAndSourceIndex(Client client, - DataFrameAnalyticsConfig config, - ActionListener listener) { - validateIndexAndExtractFields( - client, - config.getSource().getIndex(), - config, - config.getDest().getResultsField(), - false, - ActionListener.wrap( - fields -> { - config.getSource().getParsedQuery(); // validate query is acceptable - listener.onResponse(config); - }, - listener::onFailure - ) - ); - } - - private static void validateIndexAndExtractFields(Client client, - String[] index, - DataFrameAnalyticsConfig config, - String resultsField, - boolean isTaskRestarting, - ActionListener listener) { - AtomicInteger docValueFieldsLimitHolder = new AtomicInteger(); - AtomicReference extractedFieldsHolder = new AtomicReference<>(); - - // Step 4. Check fields cardinality vs limits and notify listener - ActionListener checkCardinalityHandler = ActionListener.wrap( - searchResponse -> { - if (searchResponse != null) { - Aggregations aggs = searchResponse.getAggregations(); - if (aggs == null) { - listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities")); - return; - } - for (Map.Entry entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) { - String fieldName = entry.getKey(); - Long limit = entry.getValue(); - Cardinality cardinality = aggs.get(fieldName); - if (cardinality == null) { - listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities")); - return; - } - if (cardinality.getValue() > limit) { - listener.onFailure( - ExceptionsHelper.badRequestException( - "Field [{}] must have at most [{}] distinct values but there were at least [{}]", - fieldName, limit, cardinality.getValue())); - return; - } - } - } - listener.onResponse(extractedFieldsHolder.get()); + ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client); + extractedFieldsDetectorFactory.createFromDest(config, isTaskRestarting, ActionListener.wrap( + extractedFieldsDetector -> { + ExtractedFields extractedFields = extractedFieldsDetector.detect(); + DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(), + Collections.singletonList(config.getDest().getIndex()), extractedFields, config.getHeaders(), + config.getAnalysis().supportsMissingValues()); + listener.onResponse(extractorFactory); }, listener::onFailure - ); - - // Step 3. Extract fields (if possible) - ActionListener fieldCapabilitiesHandler = ActionListener.wrap( - fieldCapabilitiesResponse -> { - extractedFieldsHolder.set( - new ExtractedFieldsDetector( - index, config, resultsField, isTaskRestarting, docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse) - .detect()); - - Map fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits(); - if (fieldCardinalityLimits.isEmpty()) { - checkCardinalityHandler.onResponse(null); - } else { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0); - for (Map.Entry entry : fieldCardinalityLimits.entrySet()) { - String fieldName = entry.getKey(); - Long limit = entry.getValue(); - searchSourceBuilder.aggregation( - AggregationBuilders.cardinality(fieldName) - .field(fieldName) - .precisionThreshold(limit + 1)); - } - SearchRequest searchRequest = new SearchRequest(config.getSource().getIndex()).source(searchSourceBuilder); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, checkCardinalityHandler); - } - }, - listener::onFailure - ); - - // Step 2. Get field capabilities necessary to build the information of how to extract fields - ActionListener docValueFieldsLimitListener = ActionListener.wrap( - docValueFieldsLimit -> { - docValueFieldsLimitHolder.set(docValueFieldsLimit); - - FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); - fieldCapabilitiesRequest.indices(index); - fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - fieldCapabilitiesRequest.fields("*"); - ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { - client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); - // This response gets discarded - the listener handles the real response - return null; - }); - }, - listener::onFailure - ); - - // Step 1. Get doc value fields limit - getDocValueFieldsLimit(client, index, docValueFieldsLimitListener); - } - - private static void getDocValueFieldsLimit(Client client, String[] index, ActionListener docValueFieldsLimitListener) { - ActionListener settingsListener = ActionListener.wrap(getSettingsResponse -> { - Integer minDocValueFieldsLimit = Integer.MAX_VALUE; - - ImmutableOpenMap indexToSettings = getSettingsResponse.getIndexToSettings(); - Iterator> iterator = indexToSettings.iterator(); - while (iterator.hasNext()) { - ObjectObjectCursor indexSettings = iterator.next(); - Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value); - if (indexMaxDocValueFields < minDocValueFieldsLimit) { - minDocValueFieldsLimit = indexMaxDocValueFields; - } - } - docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit); - }, - e -> { - if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { - docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index " - + ((IndexNotFoundException) e).getIndex() + " does not exist")); - } else { - docValueFieldsLimitListener.onFailure(e); - } - } - ); - - GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); - getSettingsRequest.indices(index); - getSettingsRequest.includeDefaults(true); - getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey()); - client.admin().indices().getSettings(getSettingsRequest, settingsListener); + )); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java index 2a82ae7dcf9..5d94b57aca5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -52,19 +52,20 @@ public class ExtractedFieldsDetector { private final String[] index; private final DataFrameAnalyticsConfig config; - private final String resultsField; private final boolean isTaskRestarting; private final int docValueFieldsLimit; private final FieldCapabilitiesResponse fieldCapabilitiesResponse; + private final Map fieldCardinalities; - ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, String resultsField, boolean isTaskRestarting, - int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) { + ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, + int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse, + Map fieldCardinalities) { this.index = Objects.requireNonNull(index); this.config = Objects.requireNonNull(config); - this.resultsField = resultsField; this.isTaskRestarting = isTaskRestarting; this.docValueFieldsLimit = docValueFieldsLimit; this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse); + this.fieldCardinalities = Objects.requireNonNull(fieldCardinalities); } public ExtractedFields detect() { @@ -79,11 +80,13 @@ public class ExtractedFieldsDetector { checkNoIgnoredFields(fields); checkFieldsHaveCompatibleTypes(fields); checkRequiredFields(fields); + checkFieldsWithCardinalityLimit(); return detectExtractedFields(fields); } private Set getIncludedFields() { Set fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet()); + checkResultsFieldIsNotPresent(); removeFieldsUnderResultsField(fields); FetchSourceContext analyzedFields = config.getAnalyzedFields(); @@ -96,21 +99,13 @@ public class ExtractedFieldsDetector { return fields; } - private void removeFieldsUnderResultsField(Set fields) { - if (resultsField == null) { - return; - } - checkResultsFieldIsNotPresent(); - // Ignore fields under the results object - fields.removeIf(field -> field.startsWith(resultsField + ".")); - } - private void checkResultsFieldIsNotPresent() { // If the task is restarting we do not mind the index containing the results field, we will overwrite all docs if (isTaskRestarting) { return; } + String resultsField = config.getDest().getResultsField(); Map indexToFieldCaps = fieldCapabilitiesResponse.getField(resultsField); if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) { throw ExceptionsHelper.badRequestException( @@ -122,6 +117,11 @@ public class ExtractedFieldsDetector { } } + private void removeFieldsUnderResultsField(Set fields) { + // Ignore fields under the results object + fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + ".")); + } + private void removeFieldsWithIncompatibleTypes(Set fields) { Iterator fieldsIterator = fields.iterator(); while (fieldsIterator.hasNext()) { @@ -234,6 +234,19 @@ public class ExtractedFieldsDetector { } } + private void checkFieldsWithCardinalityLimit() { + for (Map.Entry entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) { + String fieldName = entry.getKey(); + long limit = entry.getValue(); + long cardinality = fieldCardinalities.get(fieldName); + if (cardinality > limit) { + throw ExceptionsHelper.badRequestException( + "Field [{}] must have at most [{}] distinct values but there were at least [{}]", + fieldName, limit, cardinality); + } + } + } + private ExtractedFields detectExtractedFields(Set fields) { List sortedFields = new ArrayList<>(fields); // We sort the fields to ensure the checksum for each document is deterministic diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorFactory.java new file mode 100644 index 00000000000..ea37bdf393a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorFactory.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.extractor; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.metrics.Cardinality; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A factory that retrieves all the parts necessary to build a {@link ExtractedFieldsDetector}. + */ +public class ExtractedFieldsDetectorFactory { + + private final Client client; + + public ExtractedFieldsDetectorFactory(Client client) { + this.client = Objects.requireNonNull(client); + } + + public void createFromSource(DataFrameAnalyticsConfig config, boolean isTaskRestarting, + ActionListener listener) { + create(config.getSource().getIndex(), config, isTaskRestarting, listener); + } + + public void createFromDest(DataFrameAnalyticsConfig config, boolean isTaskRestarting, + ActionListener listener) { + create(new String[] {config.getDest().getIndex()}, config, isTaskRestarting, listener); + } + + private void create(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, + ActionListener listener) { + AtomicInteger docValueFieldsLimitHolder = new AtomicInteger(); + AtomicReference fieldCapsResponseHolder = new AtomicReference<>(); + + // Step 4. Create cardinality by field map and build detector + ActionListener> fieldCardinalitiesHandler = ActionListener.wrap( + fieldCardinalities -> { + ExtractedFieldsDetector detector = new ExtractedFieldsDetector(index, config, isTaskRestarting, + docValueFieldsLimitHolder.get(), fieldCapsResponseHolder.get(), fieldCardinalities); + listener.onResponse(detector); + }, + listener::onFailure + ); + + // Step 3. Get cardinalities for fields with limits + ActionListener fieldCapabilitiesHandler = ActionListener.wrap( + fieldCapabilitiesResponse -> { + fieldCapsResponseHolder.set(fieldCapabilitiesResponse); + getCardinalitiesForFieldsWithLimit(index, config, fieldCardinalitiesHandler); + }, + listener::onFailure + ); + + // Step 2. Get field capabilities necessary to build the information of how to extract fields + ActionListener docValueFieldsLimitListener = ActionListener.wrap( + docValueFieldsLimit -> { + docValueFieldsLimitHolder.set(docValueFieldsLimit); + getFieldCaps(index, config, fieldCapabilitiesHandler); + }, + listener::onFailure + ); + + // Step 1. Get doc value fields limit + getDocValueFieldsLimit(index, docValueFieldsLimitListener); + } + + private void getCardinalitiesForFieldsWithLimit(String[] index, DataFrameAnalyticsConfig config, + ActionListener> listener) { + Map fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits(); + if (fieldCardinalityLimits.isEmpty()) { + listener.onResponse(Collections.emptyMap()); + return; + } + + ActionListener searchListener = ActionListener.wrap( + searchResponse -> buildFieldCardinalitiesMap(config, searchResponse, listener), + listener::onFailure + ); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0); + for (Map.Entry entry : fieldCardinalityLimits.entrySet()) { + String fieldName = entry.getKey(); + Long limit = entry.getValue(); + searchSourceBuilder.aggregation( + AggregationBuilders.cardinality(fieldName) + .field(fieldName) + .precisionThreshold(limit + 1)); + } + SearchRequest searchRequest = new SearchRequest(index).source(searchSourceBuilder); + ClientHelper.executeWithHeadersAsync( + config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, searchListener); + } + + private void buildFieldCardinalitiesMap(DataFrameAnalyticsConfig config, SearchResponse searchResponse, + ActionListener> listener) { + Aggregations aggs = searchResponse.getAggregations(); + if (aggs == null) { + listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities")); + return; + } + + Map fieldCardinalities = new HashMap<>(config.getAnalysis().getFieldCardinalityLimits().size()); + for (String field : config.getAnalysis().getFieldCardinalityLimits().keySet()) { + Cardinality cardinality = aggs.get(field); + if (cardinality == null) { + listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities")); + return; + } + fieldCardinalities.put(field, cardinality.getValue()); + } + listener.onResponse(fieldCardinalities); + } + + private void getFieldCaps(String[] index, DataFrameAnalyticsConfig config, ActionListener listener) { + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); + fieldCapabilitiesRequest.indices(index); + fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + fieldCapabilitiesRequest.fields("*"); + ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { + client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, listener); + // This response gets discarded - the listener handles the real response + return null; + }); + } + + private void getDocValueFieldsLimit(String[] index, ActionListener docValueFieldsLimitListener) { + ActionListener settingsListener = ActionListener.wrap(getSettingsResponse -> { + Integer minDocValueFieldsLimit = Integer.MAX_VALUE; + + ImmutableOpenMap indexToSettings = getSettingsResponse.getIndexToSettings(); + Iterator> iterator = indexToSettings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor indexSettings = iterator.next(); + Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value); + if (indexMaxDocValueFields < minDocValueFieldsLimit) { + minDocValueFieldsLimit = indexMaxDocValueFields; + } + } + docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit); + }, + e -> { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { + docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index " + + ((IndexNotFoundException) e).getIndex() + " does not exist")); + } else { + docValueFieldsLimitListener.onFailure(e); + } + } + ); + + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices(index); + getSettingsRequest.includeDefaults(true); + getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey()); + client.admin().indices().getSettings(getSettingsRequest, settingsListener); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 7dc203dfe24..8f33c9bfbbf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -47,7 +47,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .addAggregatableField("some_float", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -62,7 +62,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -76,7 +76,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .addAggregatableField("some_keyword", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." + @@ -88,7 +88,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .addAggregatableField("indecisive_field", "float", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + @@ -104,7 +104,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -125,7 +125,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -144,7 +144,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); @@ -160,7 +160,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); @@ -176,7 +176,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); @@ -190,7 +190,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -207,7 +207,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " + @@ -223,19 +223,33 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildClassificationConfig("some_float"), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " + "expected types are [boolean, byte, integer, ip, keyword, long, short, text]")); } + public void testDetect_GivenClassificationAndDependentVariableHasInvalidCardinality() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("some_long", "long") + .addAggregatableField("some_keyword", "keyword") + .addAggregatableField("foo", "keyword") + .build(); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX, + buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L)); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + + assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]")); + } + public void testDetect_GivenIgnoredField() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("_id", "float").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + @@ -248,7 +262,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("field [_id] cannot be analyzed")); @@ -270,7 +284,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -287,7 +301,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected")); @@ -302,7 +316,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); @@ -318,7 +332,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -337,7 +351,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " + @@ -353,7 +367,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + @@ -369,7 +383,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -387,7 +401,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + @@ -404,29 +418,12 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); assertThat(e.getMessage(), equalTo("No field [ml] could be detected")); } - public void testDetect_NullResultsField() { - FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() - .addAggregatableField(RESULTS_FIELD, "float") - .addAggregatableField("my_field1", "float") - .addAggregatableField("your_field2", "float") - .addAggregatableField("your_keyword", "keyword") - .build(); - - ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), null, false, 100, fieldCapabilities); - ExtractedFields extractedFields = extractedFieldsDetector.detect(); - - List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) - .collect(Collectors.toList()); - assertThat(extractedFieldNames, equalTo(Arrays.asList(RESULTS_FIELD, "my_field1", "your_field2"))); - } - public void testDetect_GivenLessFieldsThanDocValuesLimit() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("field_1", "float") @@ -436,7 +433,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 4, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), true, 4, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -455,7 +452,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 3, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), true, 3, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -474,7 +471,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 2, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), true, 2, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) @@ -490,7 +487,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -515,7 +512,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildClassificationConfig("some_boolean"), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildClassificationConfig("some_boolean"), false, 100, fieldCapabilities, + Collections.singletonMap("some_boolean", 2L)); ExtractedFields extractedFields = extractedFieldsDetector.detect(); List allFields = extractedFields.getAllFields(); @@ -547,7 +545,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("a_float"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("a_float"), true, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(5)); @@ -564,7 +562,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildClassificationConfig("field_1"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildClassificationConfig("field_1"), true, 100, fieldCapabilities, Collections.singletonMap("field_1", 2L)); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -581,7 +579,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), true, 100, fieldCapabilities, + Collections.singletonMap("field_1.keyword", 2L)); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -600,7 +599,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -617,7 +616,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 0, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("field_2"), true, 0, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -635,7 +634,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2.double"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("field_2.double"), true, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -652,7 +651,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { .build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2)); @@ -670,7 +669,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities); + SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); ExtractedFields extractedFields = extractedFieldsDetector.detect(); assertThat(extractedFields.getAllFields().size(), equalTo(2));