diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index e5afb04c953..cc51f0a7c1a 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -238,7 +238,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testStopAndRestart() throws Exception { initialize("regression_stop_and_restart"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 7bac43cfad9..c3d27b09fde 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -550,7 +550,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest "Stopped analytics"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612") public void testOutlierDetectionStopAndRestart() throws Exception { String sourceIndex = "test-outlier-detection-stop-and-restart"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index fea9753314d..c53238dc425 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -15,9 +15,6 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.admin.indices.refresh.RefreshAction; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; @@ -160,34 +157,18 @@ public class DataFrameAnalyticsManager { } // Reindexing is complete; start analytics - ActionListener refreshListener = ActionListener.wrap( + ActionListener reindexCompletedListener = ActionListener.wrap( refreshResponse -> { if (task.isStopping()) { LOGGER.debug("[{}] Stopping before starting analytics process", config.getId()); return; } task.setReindexingTaskId(null); - startAnalytics(task, config, false); - }, - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) - ); - - // Refresh to ensure copied index is fully searchable - ActionListener reindexCompletedListener = ActionListener.wrap( - bulkResponse -> { - if (task.isStopping()) { - LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId()); - return; - } task.setReindexingFinished(); auditor.info( config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex())); - ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.ML_ORIGIN, - RefreshAction.INSTANCE, - new RefreshRequest(config.getDest().getIndex()), - refreshListener); + startAnalytics(task, config, false); }, error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index d6be817804b..55f5ef6be53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -79,6 +79,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } public void setReindexingTaskId(Long reindexingTaskId) { + LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId); this.reindexingTaskId = reindexingTaskId; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index 6b6983ce739..1b887278c41 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -237,7 +237,9 @@ public class DataFrameDataExtractor { public DataSummary collectDataSummary() { SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder(); SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); - return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size()); + long rows = searchResponse.getHits().getTotalHits().value; + LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows); + return new DataSummary(rows, context.extractedFields.getAllFields().size()); } public void collectDataSummaryAsync(ActionListener dataSummaryActionListener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 485b9d9d605..85b40bd6493 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -81,6 +81,9 @@ public class AnalyticsProcessManager { return; } + // First we refresh the dest index to ensure data is searchable + refreshDest(config); + ProcessContext processContext = new ProcessContext(config.getId()); if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) { finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id @@ -147,10 +150,12 @@ public class AnalyticsProcessManager { refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { - String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) - .getFormattedMessage(); - LOGGER.error(errorMsg, e); - processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + if (task.isStopping() == false) { + String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()) + .getFormattedMessage(); + LOGGER.error(errorMsg, e); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); + } } finally { closeProcess(task);