This commit moves the async calls required to retrieve the components that make up `ExtractedFieldsExtractor` out of `DataFrameDataExtractorFactory` and into a dedicated `ExtractorFieldsExtractorFactory` class. A few more refactorings are performed: - The detector no longer needs the results field. Instead, it knows whether to use it or not based on whether the task is restarting. - We pass more accurately whether the task is restarting or not. - The validation of whether fields that have a cardinality limit are valid is now performed in the detector after retrieving the respective cardinalities. Backport of #49315
This commit is contained in:
parent
543f5f4faf
commit
4d6e037e90
|
@ -81,6 +81,7 @@ public class TransportEstimateMemoryUsageAction
|
|||
DataFrameDataExtractorFactory.createForSourceIndices(
|
||||
client,
|
||||
taskId,
|
||||
true, // We are not interested in first-time run validations here
|
||||
request.getConfig(),
|
||||
ActionListener.wrap(
|
||||
dataExtractorFactory -> {
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
|
|||
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
|
||||
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
|
||||
import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
|
||||
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
|
||||
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
|
||||
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
|
||||
|
@ -228,33 +229,7 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
|
||||
// Step 6. Validate that there are analyzable data in the source index
|
||||
ActionListener<StartContext> validateMappingsMergeListener = ActionListener.wrap(
|
||||
startContext -> DataFrameDataExtractorFactory.createForSourceIndices(client,
|
||||
"validate_source_index_has_rows-" + id,
|
||||
startContext.config,
|
||||
ActionListener.wrap(
|
||||
dataFrameDataExtractorFactory ->
|
||||
dataFrameDataExtractorFactory
|
||||
.newExtractor(false)
|
||||
.collectDataSummaryAsync(ActionListener.wrap(
|
||||
dataSummary -> {
|
||||
if (dataSummary.rows == 0) {
|
||||
finalListener.onFailure(ExceptionsHelper.badRequestException(
|
||||
"Unable to start {} as no documents in the source indices [{}] contained all the fields "
|
||||
+ "selected for analysis. If you are relying on automatic field selection then there are "
|
||||
+ "currently mapped fields that do not exist in any indexed documents, and you will have "
|
||||
+ "to switch to explicit field selection and include only fields that exist in indexed "
|
||||
+ "documents.",
|
||||
id, Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
|
||||
));
|
||||
} else {
|
||||
finalListener.onResponse(startContext);
|
||||
}
|
||||
},
|
||||
finalListener::onFailure
|
||||
)),
|
||||
finalListener::onFailure
|
||||
))
|
||||
,
|
||||
startContext -> validateSourceIndexHasRows(startContext, finalListener),
|
||||
finalListener::onFailure
|
||||
);
|
||||
|
||||
|
@ -269,9 +244,7 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
// Step 4. Validate dest index is empty if task is starting for first time
|
||||
ActionListener<StartContext> toValidateDestEmptyListener = ActionListener.wrap(
|
||||
startContext -> {
|
||||
DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState(
|
||||
startContext.config.getId(), startContext.progressOnStart);
|
||||
switch (startingState) {
|
||||
switch (startContext.startingState) {
|
||||
case FIRST_TIME:
|
||||
checkDestIndexIsEmptyIfExists(startContext, toValidateMappingsListener);
|
||||
break;
|
||||
|
@ -285,7 +258,7 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
"Cannot start because the job has already finished"));
|
||||
break;
|
||||
default:
|
||||
finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startingState));
|
||||
finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startContext.startingState));
|
||||
break;
|
||||
}
|
||||
},
|
||||
|
@ -295,9 +268,16 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
// Step 3. Validate source and dest; check data extraction is possible
|
||||
ActionListener<StartContext> startContextListener = ActionListener.wrap(
|
||||
startContext -> {
|
||||
// Validate the query parses
|
||||
startContext.config.getSource().getParsedQuery();
|
||||
|
||||
// Validate source/dest are valid
|
||||
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(startContext.config);
|
||||
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, startContext.config, ActionListener.wrap(
|
||||
config -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
|
||||
|
||||
// Validate extraction is possible
|
||||
boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
|
||||
new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, isTaskRestarting, ActionListener.wrap(
|
||||
extractedFieldsDetector -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
|
||||
},
|
||||
finalListener::onFailure
|
||||
);
|
||||
|
@ -313,6 +293,38 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
configProvider.get(id, getConfigListener);
|
||||
}
|
||||
|
||||
private void validateSourceIndexHasRows(StartContext startContext, ActionListener<StartContext> listener) {
|
||||
boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
|
||||
DataFrameDataExtractorFactory.createForSourceIndices(client,
|
||||
"validate_source_index_has_rows-" + startContext.config.getId(),
|
||||
isTaskRestarting,
|
||||
startContext.config,
|
||||
ActionListener.wrap(
|
||||
dataFrameDataExtractorFactory ->
|
||||
dataFrameDataExtractorFactory
|
||||
.newExtractor(false)
|
||||
.collectDataSummaryAsync(ActionListener.wrap(
|
||||
dataSummary -> {
|
||||
if (dataSummary.rows == 0) {
|
||||
listener.onFailure(ExceptionsHelper.badRequestException(
|
||||
"Unable to start {} as no documents in the source indices [{}] contained all the fields "
|
||||
+ "selected for analysis. If you are relying on automatic field selection then there are "
|
||||
+ "currently mapped fields that do not exist in any indexed documents, and you will have "
|
||||
+ "to switch to explicit field selection and include only fields that exist in indexed "
|
||||
+ "documents.",
|
||||
startContext.config.getId(),
|
||||
Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
|
||||
));
|
||||
} else {
|
||||
listener.onResponse(startContext);
|
||||
}
|
||||
},
|
||||
listener::onFailure
|
||||
)),
|
||||
listener::onFailure
|
||||
));
|
||||
}
|
||||
|
||||
private void getProgress(DataFrameAnalyticsConfig config, ActionListener<List<PhaseProgress>> listener) {
|
||||
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(config.getId());
|
||||
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
|
||||
|
@ -389,10 +401,12 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
private static class StartContext {
|
||||
private final DataFrameAnalyticsConfig config;
|
||||
private final List<PhaseProgress> progressOnStart;
|
||||
private final DataFrameAnalyticsTask.StartingState startingState;
|
||||
|
||||
private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progressOnStart) {
|
||||
this.config = config;
|
||||
this.progressOnStart = progressOnStart;
|
||||
this.startingState = DataFrameAnalyticsTask.determineStartingState(config.getId(), progressOnStart);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,43 +5,20 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.dataframe.extractor;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.metrics.Cardinality;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
||||
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class DataFrameDataExtractorFactory {
|
||||
|
||||
|
@ -94,27 +71,27 @@ public class DataFrameDataExtractorFactory {
|
|||
* The source index must exist and contain at least 1 compatible field or validations will fail.
|
||||
*
|
||||
* @param client ES Client used to make calls against the cluster
|
||||
* @param taskId The task id
|
||||
* @param isTaskRestarting Whether the task is restarting or it is running for the first time
|
||||
* @param config The config from which to create the extractor factory
|
||||
* @param listener The listener to notify on creation or failure
|
||||
*/
|
||||
public static void createForSourceIndices(Client client,
|
||||
String taskId,
|
||||
boolean isTaskRestarting,
|
||||
DataFrameAnalyticsConfig config,
|
||||
ActionListener<DataFrameDataExtractorFactory> listener) {
|
||||
validateIndexAndExtractFields(
|
||||
client,
|
||||
config.getSource().getIndex(),
|
||||
config,
|
||||
null,
|
||||
false,
|
||||
ActionListener.wrap(
|
||||
extractedFields -> listener.onResponse(
|
||||
new DataFrameDataExtractorFactory(
|
||||
client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(),
|
||||
config.getAnalysis().supportsMissingValues())),
|
||||
ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client);
|
||||
extractedFieldsDetectorFactory.createFromSource(config, isTaskRestarting, ActionListener.wrap(
|
||||
extractedFieldsDetector -> {
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, taskId,
|
||||
Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(),
|
||||
config.getAnalysis().supportsMissingValues());
|
||||
listener.onResponse(extractorFactory);
|
||||
},
|
||||
listener::onFailure
|
||||
)
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,168 +108,16 @@ public class DataFrameDataExtractorFactory {
|
|||
DataFrameAnalyticsConfig config,
|
||||
boolean isTaskRestarting,
|
||||
ActionListener<DataFrameDataExtractorFactory> listener) {
|
||||
validateIndexAndExtractFields(
|
||||
client,
|
||||
new String[] {config.getDest().getIndex()},
|
||||
config,
|
||||
config.getDest().getResultsField(),
|
||||
isTaskRestarting,
|
||||
ActionListener.wrap(
|
||||
extractedFields -> listener.onResponse(
|
||||
new DataFrameDataExtractorFactory(
|
||||
client, config.getId(), Arrays.asList(config.getDest().getIndex()), extractedFields, config.getHeaders(),
|
||||
config.getAnalysis().supportsMissingValues())),
|
||||
listener::onFailure
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the source index and analytics config
|
||||
*
|
||||
* @param client ES Client to make calls
|
||||
* @param config Analytics config to validate
|
||||
* @param listener The listener to notify on failure or completion
|
||||
*/
|
||||
public static void validateConfigAndSourceIndex(Client client,
|
||||
DataFrameAnalyticsConfig config,
|
||||
ActionListener<DataFrameAnalyticsConfig> listener) {
|
||||
validateIndexAndExtractFields(
|
||||
client,
|
||||
config.getSource().getIndex(),
|
||||
config,
|
||||
config.getDest().getResultsField(),
|
||||
false,
|
||||
ActionListener.wrap(
|
||||
fields -> {
|
||||
config.getSource().getParsedQuery(); // validate query is acceptable
|
||||
listener.onResponse(config);
|
||||
ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client);
|
||||
extractedFieldsDetectorFactory.createFromDest(config, isTaskRestarting, ActionListener.wrap(
|
||||
extractedFieldsDetector -> {
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(),
|
||||
Collections.singletonList(config.getDest().getIndex()), extractedFields, config.getHeaders(),
|
||||
config.getAnalysis().supportsMissingValues());
|
||||
listener.onResponse(extractorFactory);
|
||||
},
|
||||
listener::onFailure
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private static void validateIndexAndExtractFields(Client client,
|
||||
String[] index,
|
||||
DataFrameAnalyticsConfig config,
|
||||
String resultsField,
|
||||
boolean isTaskRestarting,
|
||||
ActionListener<ExtractedFields> listener) {
|
||||
AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();
|
||||
AtomicReference<ExtractedFields> extractedFieldsHolder = new AtomicReference<>();
|
||||
|
||||
// Step 4. Check fields cardinality vs limits and notify listener
|
||||
ActionListener<SearchResponse> checkCardinalityHandler = ActionListener.wrap(
|
||||
searchResponse -> {
|
||||
if (searchResponse != null) {
|
||||
Aggregations aggs = searchResponse.getAggregations();
|
||||
if (aggs == null) {
|
||||
listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
|
||||
return;
|
||||
}
|
||||
for (Map.Entry<String, Long> entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Long limit = entry.getValue();
|
||||
Cardinality cardinality = aggs.get(fieldName);
|
||||
if (cardinality == null) {
|
||||
listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
|
||||
return;
|
||||
}
|
||||
if (cardinality.getValue() > limit) {
|
||||
listener.onFailure(
|
||||
ExceptionsHelper.badRequestException(
|
||||
"Field [{}] must have at most [{}] distinct values but there were at least [{}]",
|
||||
fieldName, limit, cardinality.getValue()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
listener.onResponse(extractedFieldsHolder.get());
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 3. Extract fields (if possible)
|
||||
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
|
||||
fieldCapabilitiesResponse -> {
|
||||
extractedFieldsHolder.set(
|
||||
new ExtractedFieldsDetector(
|
||||
index, config, resultsField, isTaskRestarting, docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse)
|
||||
.detect());
|
||||
|
||||
Map<String, Long> fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits();
|
||||
if (fieldCardinalityLimits.isEmpty()) {
|
||||
checkCardinalityHandler.onResponse(null);
|
||||
} else {
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
|
||||
for (Map.Entry<String, Long> entry : fieldCardinalityLimits.entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Long limit = entry.getValue();
|
||||
searchSourceBuilder.aggregation(
|
||||
AggregationBuilders.cardinality(fieldName)
|
||||
.field(fieldName)
|
||||
.precisionThreshold(limit + 1));
|
||||
}
|
||||
SearchRequest searchRequest = new SearchRequest(config.getSource().getIndex()).source(searchSourceBuilder);
|
||||
ClientHelper.executeWithHeadersAsync(
|
||||
config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, checkCardinalityHandler);
|
||||
}
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 2. Get field capabilities necessary to build the information of how to extract fields
|
||||
ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
|
||||
docValueFieldsLimit -> {
|
||||
docValueFieldsLimitHolder.set(docValueFieldsLimit);
|
||||
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
|
||||
fieldCapabilitiesRequest.indices(index);
|
||||
fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
fieldCapabilitiesRequest.fields("*");
|
||||
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
|
||||
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
|
||||
// This response gets discarded - the listener handles the real response
|
||||
return null;
|
||||
});
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 1. Get doc value fields limit
|
||||
getDocValueFieldsLimit(client, index, docValueFieldsLimitListener);
|
||||
}
|
||||
|
||||
private static void getDocValueFieldsLimit(Client client, String[] index, ActionListener<Integer> docValueFieldsLimitListener) {
|
||||
ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
|
||||
Integer minDocValueFieldsLimit = Integer.MAX_VALUE;
|
||||
|
||||
ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
|
||||
Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
|
||||
Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
|
||||
if (indexMaxDocValueFields < minDocValueFieldsLimit) {
|
||||
minDocValueFieldsLimit = indexMaxDocValueFields;
|
||||
}
|
||||
}
|
||||
docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
|
||||
},
|
||||
e -> {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
|
||||
docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
|
||||
+ ((IndexNotFoundException) e).getIndex() + " does not exist"));
|
||||
} else {
|
||||
docValueFieldsLimitListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
|
||||
getSettingsRequest.indices(index);
|
||||
getSettingsRequest.includeDefaults(true);
|
||||
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
|
||||
client.admin().indices().getSettings(getSettingsRequest, settingsListener);
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,19 +52,20 @@ public class ExtractedFieldsDetector {
|
|||
|
||||
private final String[] index;
|
||||
private final DataFrameAnalyticsConfig config;
|
||||
private final String resultsField;
|
||||
private final boolean isTaskRestarting;
|
||||
private final int docValueFieldsLimit;
|
||||
private final FieldCapabilitiesResponse fieldCapabilitiesResponse;
|
||||
private final Map<String, Long> fieldCardinalities;
|
||||
|
||||
ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, String resultsField, boolean isTaskRestarting,
|
||||
int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) {
|
||||
ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
|
||||
int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse,
|
||||
Map<String, Long> fieldCardinalities) {
|
||||
this.index = Objects.requireNonNull(index);
|
||||
this.config = Objects.requireNonNull(config);
|
||||
this.resultsField = resultsField;
|
||||
this.isTaskRestarting = isTaskRestarting;
|
||||
this.docValueFieldsLimit = docValueFieldsLimit;
|
||||
this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse);
|
||||
this.fieldCardinalities = Objects.requireNonNull(fieldCardinalities);
|
||||
}
|
||||
|
||||
public ExtractedFields detect() {
|
||||
|
@ -79,11 +80,13 @@ public class ExtractedFieldsDetector {
|
|||
checkNoIgnoredFields(fields);
|
||||
checkFieldsHaveCompatibleTypes(fields);
|
||||
checkRequiredFields(fields);
|
||||
checkFieldsWithCardinalityLimit();
|
||||
return detectExtractedFields(fields);
|
||||
}
|
||||
|
||||
private Set<String> getIncludedFields() {
|
||||
Set<String> fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet());
|
||||
checkResultsFieldIsNotPresent();
|
||||
removeFieldsUnderResultsField(fields);
|
||||
FetchSourceContext analyzedFields = config.getAnalyzedFields();
|
||||
|
||||
|
@ -96,21 +99,13 @@ public class ExtractedFieldsDetector {
|
|||
return fields;
|
||||
}
|
||||
|
||||
private void removeFieldsUnderResultsField(Set<String> fields) {
|
||||
if (resultsField == null) {
|
||||
return;
|
||||
}
|
||||
checkResultsFieldIsNotPresent();
|
||||
// Ignore fields under the results object
|
||||
fields.removeIf(field -> field.startsWith(resultsField + "."));
|
||||
}
|
||||
|
||||
private void checkResultsFieldIsNotPresent() {
|
||||
// If the task is restarting we do not mind the index containing the results field, we will overwrite all docs
|
||||
if (isTaskRestarting) {
|
||||
return;
|
||||
}
|
||||
|
||||
String resultsField = config.getDest().getResultsField();
|
||||
Map<String, FieldCapabilities> indexToFieldCaps = fieldCapabilitiesResponse.getField(resultsField);
|
||||
if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
|
@ -122,6 +117,11 @@ public class ExtractedFieldsDetector {
|
|||
}
|
||||
}
|
||||
|
||||
private void removeFieldsUnderResultsField(Set<String> fields) {
|
||||
// Ignore fields under the results object
|
||||
fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + "."));
|
||||
}
|
||||
|
||||
private void removeFieldsWithIncompatibleTypes(Set<String> fields) {
|
||||
Iterator<String> fieldsIterator = fields.iterator();
|
||||
while (fieldsIterator.hasNext()) {
|
||||
|
@ -234,6 +234,19 @@ public class ExtractedFieldsDetector {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkFieldsWithCardinalityLimit() {
|
||||
for (Map.Entry<String, Long> entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
long limit = entry.getValue();
|
||||
long cardinality = fieldCardinalities.get(fieldName);
|
||||
if (cardinality > limit) {
|
||||
throw ExceptionsHelper.badRequestException(
|
||||
"Field [{}] must have at most [{}] distinct values but there were at least [{}]",
|
||||
fieldName, limit, cardinality);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ExtractedFields detectExtractedFields(Set<String> fields) {
|
||||
List<String> sortedFields = new ArrayList<>(fields);
|
||||
// We sort the fields to ensure the checksum for each document is deterministic
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.dataframe.extractor;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
|
||||
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.metrics.Cardinality;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* A factory that retrieves all the parts necessary to build a {@link ExtractedFieldsDetector}.
|
||||
*/
|
||||
public class ExtractedFieldsDetectorFactory {
|
||||
|
||||
private final Client client;
|
||||
|
||||
public ExtractedFieldsDetectorFactory(Client client) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
}
|
||||
|
||||
public void createFromSource(DataFrameAnalyticsConfig config, boolean isTaskRestarting,
|
||||
ActionListener<ExtractedFieldsDetector> listener) {
|
||||
create(config.getSource().getIndex(), config, isTaskRestarting, listener);
|
||||
}
|
||||
|
||||
public void createFromDest(DataFrameAnalyticsConfig config, boolean isTaskRestarting,
|
||||
ActionListener<ExtractedFieldsDetector> listener) {
|
||||
create(new String[] {config.getDest().getIndex()}, config, isTaskRestarting, listener);
|
||||
}
|
||||
|
||||
private void create(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
|
||||
ActionListener<ExtractedFieldsDetector> listener) {
|
||||
AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();
|
||||
AtomicReference<FieldCapabilitiesResponse> fieldCapsResponseHolder = new AtomicReference<>();
|
||||
|
||||
// Step 4. Create cardinality by field map and build detector
|
||||
ActionListener<Map<String, Long>> fieldCardinalitiesHandler = ActionListener.wrap(
|
||||
fieldCardinalities -> {
|
||||
ExtractedFieldsDetector detector = new ExtractedFieldsDetector(index, config, isTaskRestarting,
|
||||
docValueFieldsLimitHolder.get(), fieldCapsResponseHolder.get(), fieldCardinalities);
|
||||
listener.onResponse(detector);
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 3. Get cardinalities for fields with limits
|
||||
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
|
||||
fieldCapabilitiesResponse -> {
|
||||
fieldCapsResponseHolder.set(fieldCapabilitiesResponse);
|
||||
getCardinalitiesForFieldsWithLimit(index, config, fieldCardinalitiesHandler);
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 2. Get field capabilities necessary to build the information of how to extract fields
|
||||
ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
|
||||
docValueFieldsLimit -> {
|
||||
docValueFieldsLimitHolder.set(docValueFieldsLimit);
|
||||
getFieldCaps(index, config, fieldCapabilitiesHandler);
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
// Step 1. Get doc value fields limit
|
||||
getDocValueFieldsLimit(index, docValueFieldsLimitListener);
|
||||
}
|
||||
|
||||
private void getCardinalitiesForFieldsWithLimit(String[] index, DataFrameAnalyticsConfig config,
|
||||
ActionListener<Map<String, Long>> listener) {
|
||||
Map<String, Long> fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits();
|
||||
if (fieldCardinalityLimits.isEmpty()) {
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
return;
|
||||
}
|
||||
|
||||
ActionListener<SearchResponse> searchListener = ActionListener.wrap(
|
||||
searchResponse -> buildFieldCardinalitiesMap(config, searchResponse, listener),
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
|
||||
for (Map.Entry<String, Long> entry : fieldCardinalityLimits.entrySet()) {
|
||||
String fieldName = entry.getKey();
|
||||
Long limit = entry.getValue();
|
||||
searchSourceBuilder.aggregation(
|
||||
AggregationBuilders.cardinality(fieldName)
|
||||
.field(fieldName)
|
||||
.precisionThreshold(limit + 1));
|
||||
}
|
||||
SearchRequest searchRequest = new SearchRequest(index).source(searchSourceBuilder);
|
||||
ClientHelper.executeWithHeadersAsync(
|
||||
config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, searchListener);
|
||||
}
|
||||
|
||||
private void buildFieldCardinalitiesMap(DataFrameAnalyticsConfig config, SearchResponse searchResponse,
|
||||
ActionListener<Map<String, Long>> listener) {
|
||||
Aggregations aggs = searchResponse.getAggregations();
|
||||
if (aggs == null) {
|
||||
listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Long> fieldCardinalities = new HashMap<>(config.getAnalysis().getFieldCardinalityLimits().size());
|
||||
for (String field : config.getAnalysis().getFieldCardinalityLimits().keySet()) {
|
||||
Cardinality cardinality = aggs.get(field);
|
||||
if (cardinality == null) {
|
||||
listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
|
||||
return;
|
||||
}
|
||||
fieldCardinalities.put(field, cardinality.getValue());
|
||||
}
|
||||
listener.onResponse(fieldCardinalities);
|
||||
}
|
||||
|
||||
private void getFieldCaps(String[] index, DataFrameAnalyticsConfig config, ActionListener<FieldCapabilitiesResponse> listener) {
|
||||
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
|
||||
fieldCapabilitiesRequest.indices(index);
|
||||
fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
fieldCapabilitiesRequest.fields("*");
|
||||
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
|
||||
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, listener);
|
||||
// This response gets discarded - the listener handles the real response
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void getDocValueFieldsLimit(String[] index, ActionListener<Integer> docValueFieldsLimitListener) {
|
||||
ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
|
||||
Integer minDocValueFieldsLimit = Integer.MAX_VALUE;
|
||||
|
||||
ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
|
||||
Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
|
||||
Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
|
||||
if (indexMaxDocValueFields < minDocValueFieldsLimit) {
|
||||
minDocValueFieldsLimit = indexMaxDocValueFields;
|
||||
}
|
||||
}
|
||||
docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
|
||||
},
|
||||
e -> {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
|
||||
docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
|
||||
+ ((IndexNotFoundException) e).getIndex() + " does not exist"));
|
||||
} else {
|
||||
docValueFieldsLimitListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
|
||||
getSettingsRequest.indices(index);
|
||||
getSettingsRequest.includeDefaults(true);
|
||||
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
|
||||
client.admin().indices().getSettings(getSettingsRequest, settingsListener);
|
||||
}
|
||||
}
|
|
@ -47,7 +47,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.addAggregatableField("some_float", "float").build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -62,7 +62,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -76,7 +76,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.addAggregatableField("some_keyword", "keyword").build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." +
|
||||
|
@ -88,7 +88,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.addAggregatableField("indecisive_field", "float", "keyword").build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
|
||||
|
@ -104,7 +104,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -125,7 +125,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -144,7 +144,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
|
||||
|
@ -160,7 +160,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
|
||||
|
@ -176,7 +176,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
|
||||
|
@ -190,7 +190,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"});
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -207,7 +207,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " +
|
||||
|
@ -223,19 +223,33 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildClassificationConfig("some_float"), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " +
|
||||
"expected types are [boolean, byte, integer, ip, keyword, long, short, text]"));
|
||||
}
|
||||
|
||||
public void testDetect_GivenClassificationAndDependentVariableHasInvalidCardinality() {
|
||||
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
|
||||
.addAggregatableField("some_long", "long")
|
||||
.addAggregatableField("some_keyword", "keyword")
|
||||
.addAggregatableField("foo", "keyword")
|
||||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX,
|
||||
buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L));
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]"));
|
||||
}
|
||||
|
||||
public void testDetect_GivenIgnoredField() {
|
||||
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
|
||||
.addAggregatableField("_id", "float").build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
|
||||
|
@ -248,7 +262,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("field [_id] cannot be analyzed"));
|
||||
|
@ -270,7 +284,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -287,7 +301,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected"));
|
||||
|
@ -302,7 +316,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
|
||||
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
|
||||
|
@ -318,7 +332,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -337,7 +351,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " +
|
||||
|
@ -353,7 +367,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
|
||||
|
@ -369,7 +383,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -387,7 +401,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
|
||||
|
@ -404,29 +418,12 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
|
||||
|
||||
assertThat(e.getMessage(), equalTo("No field [ml] could be detected"));
|
||||
}
|
||||
|
||||
public void testDetect_NullResultsField() {
|
||||
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
|
||||
.addAggregatableField(RESULTS_FIELD, "float")
|
||||
.addAggregatableField("my_field1", "float")
|
||||
.addAggregatableField("your_field2", "float")
|
||||
.addAggregatableField("your_keyword", "keyword")
|
||||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), null, false, 100, fieldCapabilities);
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
.collect(Collectors.toList());
|
||||
assertThat(extractedFieldNames, equalTo(Arrays.asList(RESULTS_FIELD, "my_field1", "your_field2")));
|
||||
}
|
||||
|
||||
public void testDetect_GivenLessFieldsThanDocValuesLimit() {
|
||||
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
|
||||
.addAggregatableField("field_1", "float")
|
||||
|
@ -436,7 +433,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 4, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 4, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -455,7 +452,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 3, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 3, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -474,7 +471,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 2, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 2, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
|
||||
|
@ -490,7 +487,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -515,7 +512,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildClassificationConfig("some_boolean"), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildClassificationConfig("some_boolean"), false, 100, fieldCapabilities,
|
||||
Collections.singletonMap("some_boolean", 2L));
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
List<ExtractedField> allFields = extractedFields.getAllFields();
|
||||
|
@ -547,7 +545,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("a_float"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("a_float"), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(5));
|
||||
|
@ -564,7 +562,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildClassificationConfig("field_1"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildClassificationConfig("field_1"), true, 100, fieldCapabilities, Collections.singletonMap("field_1", 2L));
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -581,7 +579,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), true, 100, fieldCapabilities,
|
||||
Collections.singletonMap("field_1.keyword", 2L));
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -600,7 +599,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -617,7 +616,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 0, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), true, 0, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -635,7 +634,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2.double"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2.double"), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -652,7 +651,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
@ -670,7 +669,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
|
|||
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
|
||||
|
||||
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
|
||||
SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
|
||||
ExtractedFields extractedFields = extractedFieldsDetector.detect();
|
||||
|
||||
assertThat(extractedFields.getAllFields().size(), equalTo(2));
|
||||
|
|
Loading…
Reference in New Issue