diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 1f1f48c2336..2fa1d8d4098 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -139,6 +139,7 @@ integTest.runner { 'ml/start_data_frame_analytics/Test start given missing source index', 'ml/start_data_frame_analytics/Test start given source index has no compatible fields', 'ml/start_data_frame_analytics/Test start with inconsistent body/param ids', + 'ml/start_data_frame_analytics/Test start given dest index is not empty', 'ml/start_stop_datafeed/Test start datafeed job, but not open', 'ml/start_stop_datafeed/Test start non existing datafeed', 'ml/start_stop_datafeed/Test stop non existing datafeed', diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index ca90e90f260..8835de2e228 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; @@ -14,6 +15,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -31,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -43,6 +47,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -177,26 +182,57 @@ public class TransportStartDataFrameAnalyticsAction } private void getConfigAndValidate(String id, ActionListener finalListener) { - // Validate mappings can be merged - ActionListener firstValidationListener = ActionListener.wrap( + // Step 4. Validate mappings can be merged + ActionListener toValidateMappingsListener = ActionListener.wrap( config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap( mappings -> finalListener.onResponse(config), finalListener::onFailure)), finalListener::onFailure ); - // Validate source and dest; check data extraction is possible + // Step 3. Validate dest index is empty + ActionListener toValidateDestEmptyListener = ActionListener.wrap( + config -> checkDestIndexIsEmptyIfExists(config, toValidateMappingsListener), + finalListener::onFailure + ); + + // Step 2. Validate source and dest; check data extraction is possible ActionListener getConfigListener = ActionListener.wrap( config -> { new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config); - DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener); + DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, toValidateDestEmptyListener); }, finalListener::onFailure ); - // First, get the config + // Step 1. Get the config configProvider.get(id, getConfigListener); } + private void checkDestIndexIsEmptyIfExists(DataFrameAnalyticsConfig config, ActionListener listener) { + String destIndex = config.getDest().getIndex(); + SearchRequest destEmptySearch = new SearchRequest(destIndex); + destEmptySearch.source().size(0); + destEmptySearch.allowPartialSearchResults(false); + ClientHelper.executeWithHeadersAsync(config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, + destEmptySearch, ActionListener.wrap( + searchResponse -> { + if (searchResponse.getHits().getTotalHits().value > 0) { + listener.onFailure(ExceptionsHelper.badRequestException("dest index [{}] must be empty", destIndex)); + } else { + listener.onResponse(config); + } + }, + e -> { + if (e instanceof IndexNotFoundException) { + listener.onResponse(config); + } else { + listener.onFailure(e); + } + } + ) + ); + } + private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask task, TimeValue timeout, ActionListener listener) { AnalyticsPredicate predicate = new AnalyticsPredicate(); @@ -372,6 +408,15 @@ public class TransportStartDataFrameAnalyticsAction LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId()); } } + + public void updateState(DataFrameAnalyticsState state, @Nullable String reason) { + DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason); + updatePersistentTaskState(newTaskState, ActionListener.wrap( + updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state), + e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", + getParams().getId(), state), e) + )); + } } static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 28f277dc84b..31eab37fbf6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -63,7 +63,7 @@ public class DataFrameAnalyticsManager { public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) { ActionListener reindexingStateListener = ActionListener.wrap( config -> reindexDataframeAndStartAnalysis(task, config), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // With config in hand, determine action to take @@ -129,7 +129,7 @@ public class DataFrameAnalyticsManager { task.setReindexingTaskId(null); startAnalytics(task, config, false); }, - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // Refresh to ensure copied index is fully searchable @@ -140,7 +140,7 @@ public class DataFrameAnalyticsManager { RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex()), refreshListener), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // Reindex @@ -196,15 +196,15 @@ public class DataFrameAnalyticsManager { updatedTask -> processManager.runJob(task, config, dataExtractorFactory, error -> { if (error != null) { - task.markAsFailed(error); + task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()); } else { task.markAsCompleted(); } }), - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) )); }, - task::markAsFailed + error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); // TODO This could fail with errors. In that case we get stuck with the copied index. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 242f96b953c..cb000a15496 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.client.Client; @@ -17,7 +16,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; @@ -114,7 +112,7 @@ public class AnalyticsProcessManager { finishHandler.accept(null); } else { LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); - updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); + task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); } } } @@ -176,14 +174,6 @@ public class AnalyticsProcessManager { }; } - private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) { - DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason); - task.updatePersistentTaskState(newTaskState, ActionListener.wrap( - updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state), - e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e) - )); - } - @Nullable public Integer getProgressPercent(long allocationId) { ProcessContext processContext = processContextByAllocation.get(allocationId); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml index 6a8c6d0e6ed..6417ef1e4c6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml @@ -72,3 +72,43 @@ { "id": "body_id" } + +--- +"Test start given dest index is not empty": + + - do: + index: + index: non-empty-source + refresh: "" + body: > + { + "numeric": 42.0 + } + + - do: + index: + index: non-empty-dest + refresh: "" + body: > + { + "numeric": 42.0 + } + + - do: + ml.put_data_frame_analytics: + id: "start_given_empty_dest_index" + body: > + { + "source": { + "index": "non-empty-source" + }, + "dest": { + "index": "non-empty-dest" + }, + "analysis": {"outlier_detection":{}} + } + + - do: + catch: /dest index \[non-empty-dest\] must be empty/ + ml.start_data_frame_analytics: + id: "start_given_empty_dest_index"