When a user upgrades between versions, they may stop their ML jobs. Then when the upgrade is complete, they will want to open the jobs again. But, when opening a job, we attempt to clear out the jobs finished_time. If the job configuration has adjusted between the versions (i.e. added a new field), it will dynamically update the .ml-config index. We should instead manually change the mapping to be the updated version.
This commit is contained in:
parent
0549c40ac1
commit
a497263c47
|
@ -54,6 +54,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
|
||||
|
@ -96,13 +97,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
private final JobConfigProvider jobConfigProvider;
|
||||
private final MlMemoryTracker memoryTracker;
|
||||
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
XPackLicenseState licenseState, ClusterService clusterService,
|
||||
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
|
||||
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker, Client client) {
|
||||
super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,OpenJobAction.Request::new,
|
||||
indexNameExpressionResolver);
|
||||
this.licenseState = licenseState;
|
||||
|
@ -110,6 +112,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
this.jobConfigProvider = jobConfigProvider;
|
||||
this.memoryTracker = memoryTracker;
|
||||
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -222,7 +225,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
ActionListener<NodeAcknowledgedResponse> clearJobFinishTime = ActionListener.wrap(
|
||||
response -> {
|
||||
if (response.isAcknowledged()) {
|
||||
clearJobFinishedTime(response, jobParams.getJobId(), listener);
|
||||
clearJobFinishedTime(response, state, jobParams.getJobId(), listener);
|
||||
} else {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
@ -307,17 +310,33 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
});
|
||||
}
|
||||
|
||||
private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener<NodeAcknowledgedResponse> listener) {
|
||||
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
|
||||
|
||||
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
|
||||
job -> listener.onResponse(response),
|
||||
e -> {
|
||||
logger.error("[" + jobId + "] Failed to clear finished_time", e);
|
||||
// Not a critical error so continue
|
||||
listener.onResponse(response);
|
||||
}
|
||||
));
|
||||
private void clearJobFinishedTime(NodeAcknowledgedResponse response,
|
||||
ClusterState clusterState,
|
||||
String jobId,
|
||||
ActionListener<NodeAcknowledgedResponse> listener) {
|
||||
final JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
|
||||
ActionListener<Job> clearedTimeListener = ActionListener.wrap(
|
||||
job -> listener.onResponse(response),
|
||||
e -> {
|
||||
logger.error(new ParameterizedMessage("[{}] Failed to clear finished_time", jobId), e);
|
||||
// Not a critical error so continue
|
||||
listener.onResponse(response);
|
||||
}
|
||||
);
|
||||
ActionListener<Boolean> mappingsUpdatedListener = ActionListener.wrap(
|
||||
mappingUpdateResponse -> jobConfigProvider.updateJob(jobId, update, null, clearedTimeListener),
|
||||
e -> {
|
||||
logger.error(new ParameterizedMessage("[{}] Failed to update mapping; not clearing finished_time", jobId), e);
|
||||
// Not a critical error so continue without attempting to clear finish time
|
||||
listener.onResponse(response);
|
||||
}
|
||||
);
|
||||
ElasticsearchMappings.addDocMappingIfMissing(
|
||||
MlConfigIndex.indexName(),
|
||||
MlConfigIndex::mapping,
|
||||
client,
|
||||
clusterState,
|
||||
mappingsUpdatedListener);
|
||||
}
|
||||
|
||||
private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
|
||||
|
|
Loading…
Reference in New Issue