[ML] fixing bug where analytics process starts with 0 rows (#45879) (#45988)

The native process requires that there be a non-zero number of rows to analyze. If the flag --rows 0 is passed to the executable, it throws and does not start.

When building the configuration for the process we should not start the native process if there are no rows.

Adding some logging to indicate what is occurring.
This commit is contained in:
Benjamin Trent 2019-08-26 14:18:17 -05:00 committed by GitHub
parent d64018f8e1
commit a3a4ae0ac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 6 deletions

View File

@ -143,6 +143,7 @@ integTest.runner {
'ml/start_data_frame_analytics/Test start given source index has no compatible fields', 'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids', 'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
'ml/start_data_frame_analytics/Test start given dest index is not empty', 'ml/start_data_frame_analytics/Test start given dest index is not empty',
'ml/start_data_frame_analytics/Test start with compatible fields but no data',
'ml/start_stop_datafeed/Test start datafeed job, but not open', 'ml/start_stop_datafeed/Test start datafeed job, but not open',
'ml/start_stop_datafeed/Test start non existing datafeed', 'ml/start_stop_datafeed/Test start non existing datafeed',
'ml/start_stop_datafeed/Test stop non existing datafeed', 'ml/start_stop_datafeed/Test stop non existing datafeed',

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -225,10 +226,41 @@ public class TransportStartDataFrameAnalyticsAction
} }
private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) { private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
// Step 5. Validate that there are analyzable data in the source index
ActionListener<DataFrameAnalyticsConfig> validateMappingsMergeListener = ActionListener.wrap(
config -> DataFrameDataExtractorFactory.createForSourceIndices(client,
"validate_source_index_has_rows-" + id,
config,
ActionListener.wrap(
dataFrameDataExtractorFactory ->
dataFrameDataExtractorFactory
.newExtractor(false)
.collectDataSummaryAsync(ActionListener.wrap(
dataSummary -> {
if (dataSummary.rows == 0) {
finalListener.onFailure(new ElasticsearchStatusException(
"Unable to start {} as there are no analyzable data in source indices [{}].",
RestStatus.BAD_REQUEST,
id,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())
));
} else {
finalListener.onResponse(config);
}
},
finalListener::onFailure
)),
finalListener::onFailure
))
,
finalListener::onFailure
);
// Step 4. Validate mappings can be merged // Step 4. Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap( config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)), mappings -> validateMappingsMergeListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure finalListener::onFailure
); );

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.extractor;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
@ -234,14 +235,33 @@ public class DataFrameDataExtractor {
} }
public DataSummary collectDataSummary() { public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
}
public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
final int numberOfFields = context.extractedFields.getAllFields().size();
ClientHelper.executeWithHeadersAsync(context.headers,
ClientHelper.ML_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequestBuilder.request(),
ActionListener.wrap(
searchResponse -> dataSummaryActionListener.onResponse(
new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields)),
dataSummaryActionListener::onFailure
));
}
private SearchRequestBuilder buildDataSummarySearchRequestBuilder() {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices) .setIndices(context.indices)
.setSize(0) .setSize(0)
.setQuery(context.query) .setQuery(context.query)
.setTrackTotalHits(true); .setTrackTotalHits(true);
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
} }
public Set<String> getCategoricalFields() { public Set<String> getCategoricalFields() {

View File

@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -273,7 +274,15 @@ public class AnalyticsProcessManager {
} }
dataExtractor = dataExtractorFactory.newExtractor(false); dataExtractor = dataExtractorFactory.newExtractor(false);
process = createProcess(task, createProcessConfig(config, dataExtractor)); AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(config, dataExtractor);
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
// If we have no rows, that means there is no data so no point in starting the native process
// just finish the task
if (analyticsProcessConfig.rows() == 0) {
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
return false;
}
process = createProcess(task, analyticsProcessConfig);
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true)); dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker()); resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());

View File

@ -62,6 +62,34 @@
id: "foo" id: "foo"
--- ---
"Test start with compatible fields but no data":
- do:
indices.create:
index: empty-index-with-compatible-fields
body:
mappings:
properties:
long_field: { "type": "long" }
- do:
ml.put_data_frame_analytics:
id: "empty-with-compatible-fields"
body: >
{
"source": {
"index": "empty-index-with-compatible-fields"
},
"dest": {
"index": "empty-index-with-compatible-fields-dest"
},
"analysis": {"outlier_detection":{}}
}
- do:
catch: /Unable to start empty-with-compatible-fields as there are no analyzable data in source indices \[empty-index-with-compatible-fields\]/
ml.start_data_frame_analytics:
id: "empty-with-compatible-fields"
---
"Test start with inconsistent body/param ids": "Test start with inconsistent body/param ids":
- do: - do: