diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 27840e1cd94..c6ef0a26730 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -412,6 +412,7 @@ public class TransportStartDataFrameAnalyticsAction private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; private volatile int maxOpenJobs; + private volatile ClusterState clusterState; public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager, MlMemoryTracker memoryTracker) { @@ -427,6 +428,7 @@ public class TransportStartDataFrameAnalyticsAction .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); + clusterService.addListener(event -> clusterState = event.state()); } @Override @@ -493,10 +495,10 @@ public class TransportStartDataFrameAnalyticsAction DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, task.getAllocationId(), null); task.updatePersistentTaskState(startedState, ActionListener.wrap( - response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED), + response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED, clusterState), task::markAsFailed)); } else { - manager.execute((DataFrameAnalyticsTask)task, analyticsTaskState.getState()); + manager.execute((DataFrameAnalyticsTask) task, analyticsTaskState.getState(), clusterState); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index c9e1604bf21..5c55e2f5696 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -30,6 +31,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -59,7 +61,7 @@ public class DataFrameAnalyticsManager { this.processManager = Objects.requireNonNull(processManager); } - public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) { + public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState, ClusterState clusterState) { ActionListener reindexingStateListener = ActionListener.wrap( config -> reindexDataframeAndStartAnalysis(task, config), error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) @@ -112,7 +114,13 @@ public class DataFrameAnalyticsManager { ); // Retrieve configuration - configProvider.get(task.getParams().getId(), configListener); + ActionListener stateAliasListener = ActionListener.wrap( + aBoolean -> configProvider.get(task.getParams().getId(), configListener), + configListener::onFailure + ); + + // Make sure the state index and alias exist + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasListener); } private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {