From 8a6675b9949e18e50193ab9345ced7cd2384a7db Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 2 Aug 2019 00:19:48 +0300 Subject: [PATCH] [7.x][ML] Check dest index is empty when starting DF analytics (#45094) (#45112) If one tries to start a DF analytics job that has already run, the result will be that the task will fail after reindexing the dest index from the source index. The results of the prior run will be gone and the task state is not properly set to failed with the failure reason. This commit improves the behavior in this scenario. First, we set the task state to `failed` in a set of failures that were missed. Second, a validation is added that if the destination index exists, it must be empty. --- .../ml/qa/ml-with-security/build.gradle | 1 + ...ransportStartDataFrameAnalyticsAction.java | 55 +++++++++++++++++-- .../dataframe/DataFrameAnalyticsManager.java | 12 ++-- .../process/AnalyticsProcessManager.java | 12 +--- .../test/ml/start_data_frame_analytics.yml | 40 ++++++++++++++ 5 files changed, 98 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 1f1f48c2336..2fa1d8d4098 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -139,6 +139,7 @@ integTest.runner { 'ml/start_data_frame_analytics/Test start given missing source index', 'ml/start_data_frame_analytics/Test start given source index has no compatible fields', 'ml/start_data_frame_analytics/Test start with inconsistent body/param ids', + 'ml/start_data_frame_analytics/Test start given dest index is not empty', 'ml/start_stop_datafeed/Test start datafeed job, but not open', 'ml/start_stop_datafeed/Test start non existing datafeed', 'ml/start_stop_datafeed/Test stop non existing datafeed', diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index ca90e90f260..8835de2e228 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; @@ -14,6 +15,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -31,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -43,6 +47,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -177,26 +182,57 @@ public class TransportStartDataFrameAnalyticsAction } private void getConfigAndValidate(String id, ActionListener finalListener) { - // Validate mappings can be merged - ActionListener firstValidationListener = ActionListener.wrap( + // Step 4. Validate mappings can be merged + ActionListener toValidateMappingsListener = ActionListener.wrap( config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap( mappings -> finalListener.onResponse(config), finalListener::onFailure)), finalListener::onFailure ); - // Validate source and dest; check data extraction is possible + // Step 3. Validate dest index is empty + ActionListener toValidateDestEmptyListener = ActionListener.wrap( + config -> checkDestIndexIsEmptyIfExists(config, toValidateMappingsListener), + finalListener::onFailure + ); + + // Step 2. Validate source and dest; check data extraction is possible ActionListener getConfigListener = ActionListener.wrap( config -> { new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config); - DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener); + DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, toValidateDestEmptyListener); }, finalListener::onFailure ); - // First, get the config + // Step 1. Get the config configProvider.get(id, getConfigListener); } + private void checkDestIndexIsEmptyIfExists(DataFrameAnalyticsConfig config, ActionListener listener) { + String destIndex = config.getDest().getIndex(); + SearchRequest destEmptySearch = new SearchRequest(destIndex); + destEmptySearch.source().size(0); + destEmptySearch.allowPartialSearchResults(false); + ClientHelper.executeWithHeadersAsync(config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, + destEmptySearch, ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getTotalHits().value > 0) { + listener.onFailure(ExceptionsHelper.badRequestException("dest index [{}] must be empty", destIndex)); + } else { + listener.onResponse(config); + } + }, + e -> { + if (e instanceof IndexNotFoundException) { + listener.onResponse(config); + } else { + listener.onFailure(e); + } + } + ) + ); + } + private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask task, TimeValue timeout, ActionListener listener) { AnalyticsPredicate predicate = new AnalyticsPredicate(); @@ -372,6 +408,15 @@ public class TransportStartDataFrameAnalyticsAction LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId()); } } + + public void updateState(DataFrameAnalyticsState state, @Nullable String reason) { + DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason); + updatePersistentTaskState(newTaskState, ActionListener.wrap( + updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state), + e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", + getParams().getId(), state), e) + )); + } } static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 28f277dc84b..31eab37fbf6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -63,7 +63,7 @@ public class DataFrameAnalyticsManager { public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) { ActionListener reindexingStateListener = ActionListener.wrap( config -> reindexDataframeAndStartAnalysis(task, config), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // With config in hand, determine action to take @@ -129,7 +129,7 @@ public class DataFrameAnalyticsManager { task.setReindexingTaskId(null); startAnalytics(task, config, false); }, - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // Refresh to ensure copied index is fully searchable @@ -140,7 +140,7 @@ public class DataFrameAnalyticsManager { RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex()), refreshListener), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // Reindex @@ -196,15 +196,15 @@ public class DataFrameAnalyticsManager { updatedTask -> processManager.runJob(task, config, dataExtractorFactory, error -> { if (error != null) { - task.markAsFailed(error); + task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()); } else { task.markAsCompleted(); } }), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) )); }, - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // TODO This could fail with errors. In that case we get stuck with the copied index. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 242f96b953c..cb000a15496 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.client.Client; @@ -17,7 +16,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; @@ -114,7 +112,7 @@ public class AnalyticsProcessManager { finishHandler.accept(null); } else { LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); - updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); + task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); } } } @@ -176,14 +174,6 @@ public class AnalyticsProcessManager { }; } - private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) { - DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason); - task.updatePersistentTaskState(newTaskState, ActionListener.wrap( - updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state), - e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e) - )); - } - @Nullable public Integer getProgressPercent(long allocationId) { ProcessContext processContext = processContextByAllocation.get(allocationId); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml index 6a8c6d0e6ed..6417ef1e4c6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml @@ -72,3 +72,43 @@ { "id": "body_id" } + +--- +"Test start given dest index is not empty": + + - do: + index: + index: non-empty-source + refresh: "" + body: > + { + "numeric": 42.0 + } + + - do: + index: + index: non-empty-dest + refresh: "" + body: > + { + "numeric": 42.0 + } + + - do: + ml.put_data_frame_analytics: + id: "start_given_empty_dest_index" + body: > + { + "source": { + "index": "non-empty-source" + }, + "dest": { + "index": "non-empty-dest" + }, + "analysis": {"outlier_detection":{}} + } + + - do: + catch: /dest index \[non-empty-dest\] must be empty/ + ml.start_data_frame_analytics: + id: "start_given_empty_dest_index"