diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b427e4f026e..6d25fda6abf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -268,7 +268,7 @@ public class MachineLearning implements ActionPlugin { Auditor auditor = new Auditor(internalClient, clusterService); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool); - JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, internalClient, notifier); + JobManager jobManager = new JobManager(settings, jobProvider, clusterService, auditor, internalClient, notifier); AutodetectProcessFactory autodetectProcessFactory; NormalizerProcessFactory normalizerProcessFactory; if (AUTODETECT_PROCESS.get(settings)) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 13a0333b12f..bfe9b1e141b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -18,6 +19,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction; @@ -27,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; @@ -35,6 +40,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -56,7 +62,6 @@ public class JobManager extends AbstractComponent { private final JobProvider jobProvider; private final ClusterService clusterService; - private final JobResultsPersister jobResultsPersister; private final Auditor auditor; private final Client client; private final UpdateJobProcessNotifier updateJobProcessNotifier; @@ -64,13 +69,11 @@ public class JobManager extends AbstractComponent { /** * Create a JobManager */ - public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister, - ClusterService clusterService, Auditor auditor, Client client, + public JobManager(Settings settings, JobProvider jobProvider, ClusterService clusterService, Auditor auditor, Client client, UpdateJobProcessNotifier updateJobProcessNotifier) { super(settings); this.jobProvider = Objects.requireNonNull(jobProvider); this.clusterService = Objects.requireNonNull(clusterService); - this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); this.auditor = Objects.requireNonNull(auditor); this.client = Objects.requireNonNull(client); this.updateJobProcessNotifier = updateJobProcessNotifier; @@ -376,7 +379,15 @@ public class JobManager extends AbstractComponent { * @param modelSnapshot the updated model snapshot object to be stored */ public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer handler, Consumer errorHandler) { - jobResultsPersister.updateModelSnapshot(modelSnapshot, handler, errorHandler); + String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()); + IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + modelSnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); + indexRequest.source(builder); + } catch (IOException e) { + errorHandler.accept(e); + } + client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler)); } private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 13413000774..0d1924c38d1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -231,14 +231,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), Quantiles.documentId(quantiles.getJobId())); - if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) { - // Refresh the index when persisting quantiles so that previously - // persisted results will be available for searching. Do this using the - // indices API rather than the index API (used to write the quantiles - // above), because this will refresh all shards rather than just the - // shard that the quantiles document itself was written to. - commitStateWrites(quantiles.getJobId()); - } + persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()); } /** @@ -250,17 +243,6 @@ public class JobResultsPersister extends AbstractComponent { persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); } - public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer handler, Consumer errorHandler) { - String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()); - IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); - try { - indexRequest.source(toXContentBuilder(modelSnapshot)); - } catch (IOException e) { - errorHandler.accept(e); - } - client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler)); - } - /** * Persist the memory usage data */ diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 2dd68013dfb..a351ff82393 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -108,7 +108,7 @@ public class JobManagerTests extends ESTestCase { JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); Client client = mock(Client.class); UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class); - return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, client, notifier); + return new JobManager(settings, jobProvider, clusterService, auditor, client, notifier); } private ClusterState createClusterState() {