[7.x][ML] Check dest index is empty when starting DF analytics (#45094) (#45112)

If one tries to start a DF analytics job that has already run,
the result will be that the task will fail after reindexing the
dest index from the source index. The results of the prior run
will be gone and the task state is not properly set to failed
with the failure reason.

This commit improves the behavior in this scenario. First, we
set the task state to `failed` in a set of failures that were
missed. Second, a validation is added that if the destination
index exists, it must be empty.
This commit is contained in:
Dimitris Athanasiou 2019-08-02 00:19:48 +03:00 committed by GitHub
parent c13285a382
commit 8a6675b994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 22 deletions

View File

@ -139,6 +139,7 @@ integTest.runner {
'ml/start_data_frame_analytics/Test start given missing source index', 'ml/start_data_frame_analytics/Test start given missing source index',
'ml/start_data_frame_analytics/Test start given source index has no compatible fields', 'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids', 'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
'ml/start_data_frame_analytics/Test start given dest index is not empty',
'ml/start_stop_datafeed/Test start datafeed job, but not open', 'ml/start_stop_datafeed/Test start datafeed job, but not open',
'ml/start_stop_datafeed/Test start non existing datafeed', 'ml/start_stop_datafeed/Test start non existing datafeed',
'ml/start_stop_datafeed/Test stop non existing datafeed', 'ml/start_stop_datafeed/Test stop non existing datafeed',

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
@ -14,6 +15,8 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -31,6 +34,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -43,6 +47,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
@ -177,26 +182,57 @@ public class TransportStartDataFrameAnalyticsAction
} }
private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) { private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
// Validate mappings can be merged // Step 4. Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> firstValidationListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap( config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)), mappings -> finalListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure finalListener::onFailure
); );
// Validate source and dest; check data extraction is possible // Step 3. Validate dest index is empty
ActionListener<DataFrameAnalyticsConfig> toValidateDestEmptyListener = ActionListener.wrap(
config -> checkDestIndexIsEmptyIfExists(config, toValidateMappingsListener),
finalListener::onFailure
);
// Step 2. Validate source and dest; check data extraction is possible
ActionListener<DataFrameAnalyticsConfig> getConfigListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> getConfigListener = ActionListener.wrap(
config -> { config -> {
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config); new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener); DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, toValidateDestEmptyListener);
}, },
finalListener::onFailure finalListener::onFailure
); );
// First, get the config // Step 1. Get the config
configProvider.get(id, getConfigListener); configProvider.get(id, getConfigListener);
} }
private void checkDestIndexIsEmptyIfExists(DataFrameAnalyticsConfig config, ActionListener<DataFrameAnalyticsConfig> listener) {
String destIndex = config.getDest().getIndex();
SearchRequest destEmptySearch = new SearchRequest(destIndex);
destEmptySearch.source().size(0);
destEmptySearch.allowPartialSearchResults(false);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE,
destEmptySearch, ActionListener.wrap(
searchResponse -> {
if (searchResponse.getHits().getTotalHits().value > 0) {
listener.onFailure(ExceptionsHelper.badRequestException("dest index [{}] must be empty", destIndex));
} else {
listener.onResponse(config);
}
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(config);
} else {
listener.onFailure(e);
}
}
)
);
}
private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task, private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,
TimeValue timeout, ActionListener<AcknowledgedResponse> listener) { TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
AnalyticsPredicate predicate = new AnalyticsPredicate(); AnalyticsPredicate predicate = new AnalyticsPredicate();
@ -372,6 +408,15 @@ public class TransportStartDataFrameAnalyticsAction
LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId()); LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
} }
} }
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]",
getParams().getId(), state), e)
));
}
} }
static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) { static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {

View File

@ -63,7 +63,7 @@ public class DataFrameAnalyticsManager {
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) { public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) {
ActionListener<DataFrameAnalyticsConfig> reindexingStateListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> reindexingStateListener = ActionListener.wrap(
config -> reindexDataframeAndStartAnalysis(task, config), config -> reindexDataframeAndStartAnalysis(task, config),
task::markAsFailed error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
); );
// With config in hand, determine action to take // With config in hand, determine action to take
@ -129,7 +129,7 @@ public class DataFrameAnalyticsManager {
task.setReindexingTaskId(null); task.setReindexingTaskId(null);
startAnalytics(task, config, false); startAnalytics(task, config, false);
}, },
task::markAsFailed error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
); );
// Refresh to ensure copied index is fully searchable // Refresh to ensure copied index is fully searchable
@ -140,7 +140,7 @@ public class DataFrameAnalyticsManager {
RefreshAction.INSTANCE, RefreshAction.INSTANCE,
new RefreshRequest(config.getDest().getIndex()), new RefreshRequest(config.getDest().getIndex()),
refreshListener), refreshListener),
task::markAsFailed error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
); );
// Reindex // Reindex
@ -196,15 +196,15 @@ public class DataFrameAnalyticsManager {
updatedTask -> processManager.runJob(task, config, dataExtractorFactory, updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
error -> { error -> {
if (error != null) { if (error != null) {
task.markAsFailed(error); task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
} else { } else {
task.markAsCompleted(); task.markAsCompleted();
} }
}), }),
task::markAsFailed error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
)); ));
}, },
task::markAsFailed error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
); );
// TODO This could fail with errors. In that case we get stuck with the copied index. // TODO This could fail with errors. In that case we get stuck with the copied index.

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -17,7 +16,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
@ -114,7 +112,7 @@ public class AnalyticsProcessManager {
finishHandler.accept(null); finishHandler.accept(null);
} else { } else {
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
} }
} }
} }
@ -176,14 +174,6 @@ public class AnalyticsProcessManager {
}; };
} }
private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason);
task.updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e)
));
}
@Nullable @Nullable
public Integer getProgressPercent(long allocationId) { public Integer getProgressPercent(long allocationId) {
ProcessContext processContext = processContextByAllocation.get(allocationId); ProcessContext processContext = processContextByAllocation.get(allocationId);

View File

@ -72,3 +72,43 @@
{ {
"id": "body_id" "id": "body_id"
} }
---
"Test start given dest index is not empty":
- do:
index:
index: non-empty-source
refresh: ""
body: >
{
"numeric": 42.0
}
- do:
index:
index: non-empty-dest
refresh: ""
body: >
{
"numeric": 42.0
}
- do:
ml.put_data_frame_analytics:
id: "start_given_empty_dest_index"
body: >
{
"source": {
"index": "non-empty-source"
},
"dest": {
"index": "non-empty-dest"
},
"analysis": {"outlier_detection":{}}
}
- do:
catch: /dest index \[non-empty-dest\] must be empty/
ml.start_data_frame_analytics:
id: "start_given_empty_dest_index"