[7.x][ML] Create state index and alias before starting an analytics job (#46602) (#46648)

This is fixing a bug where if an analytics job is started before any
anomaly detection job is opened, we create an index after the state
write alias.

Instead, we should create the state index and alias before starting
an analytics job and this commit makes sure this is the case.

Backport of #46602
This commit is contained in:
Dimitris Athanasiou 2019-09-13 10:34:12 +03:00 committed by GitHub
parent dae5b22bf8
commit 0bc8acaf5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 4 deletions

View File

@ -412,6 +412,7 @@ public class TransportStartDataFrameAnalyticsAction
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile int maxOpenJobs;
private volatile ClusterState clusterState;
public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager,
MlMemoryTracker memoryTracker) {
@ -427,6 +428,7 @@ public class TransportStartDataFrameAnalyticsAction
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
clusterService.addListener(event -> clusterState = event.state());
}
@Override
@ -493,10 +495,10 @@ public class TransportStartDataFrameAnalyticsAction
DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED,
task.getAllocationId(), null);
task.updatePersistentTaskState(startedState, ActionListener.wrap(
response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED),
response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED, clusterState),
task::markAsFailed));
} else {
manager.execute((DataFrameAnalyticsTask)task, analyticsTaskState.getState());
manager.execute((DataFrameAnalyticsTask) task, analyticsTaskState.getState(), clusterState);
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -30,6 +31,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -59,7 +61,7 @@ public class DataFrameAnalyticsManager {
this.processManager = Objects.requireNonNull(processManager);
}
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) {
public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState, ClusterState clusterState) {
ActionListener<DataFrameAnalyticsConfig> reindexingStateListener = ActionListener.wrap(
config -> reindexDataframeAndStartAnalysis(task, config),
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
@ -112,7 +114,13 @@ public class DataFrameAnalyticsManager {
);
// Retrieve configuration
configProvider.get(task.getParams().getId(), configListener);
ActionListener<Boolean> stateAliasListener = ActionListener.wrap(
aBoolean -> configProvider.get(task.getParams().getId(), configListener),
configListener::onFailure
);
// Make sure the state index and alias exist
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasListener);
}
private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {