[ML] Remove JobManagers dependency on JobResultsPerister (elastic/x-pack-elasticsearch#915)

* Remove JobManagers dependency on JobResultsPerister

* Remove unneeded call to refresh the state index

Original commit: elastic/x-pack-elasticsearch@0b2351bba7
This commit is contained in:
David Kyle 2017-04-03 12:00:52 +01:00 committed by GitHub
parent 2d01c3884b
commit 622c5ae166
4 changed files with 19 additions and 26 deletions

View File

@ -268,7 +268,7 @@ public class MachineLearning implements ActionPlugin {
Auditor auditor = new Auditor(internalClient, clusterService); Auditor auditor = new Auditor(internalClient, clusterService);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, internalClient, notifier); JobManager jobManager = new JobManager(settings, jobProvider, clusterService, auditor, internalClient, notifier);
AutodetectProcessFactory autodetectProcessFactory; AutodetectProcessFactory autodetectProcessFactory;
NormalizerProcessFactory normalizerProcessFactory; NormalizerProcessFactory normalizerProcessFactory;
if (AUTODETECT_PROCESS.get(settings)) { if (AUTODETECT_PROCESS.get(settings)) {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -18,6 +19,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.PutJobAction;
@ -27,6 +31,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask;
@ -35,6 +40,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -56,7 +62,6 @@ public class JobManager extends AbstractComponent {
private final JobProvider jobProvider; private final JobProvider jobProvider;
private final ClusterService clusterService; private final ClusterService clusterService;
private final JobResultsPersister jobResultsPersister;
private final Auditor auditor; private final Auditor auditor;
private final Client client; private final Client client;
private final UpdateJobProcessNotifier updateJobProcessNotifier; private final UpdateJobProcessNotifier updateJobProcessNotifier;
@ -64,13 +69,11 @@ public class JobManager extends AbstractComponent {
/** /**
* Create a JobManager * Create a JobManager
*/ */
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister, public JobManager(Settings settings, JobProvider jobProvider, ClusterService clusterService, Auditor auditor, Client client,
ClusterService clusterService, Auditor auditor, Client client,
UpdateJobProcessNotifier updateJobProcessNotifier) { UpdateJobProcessNotifier updateJobProcessNotifier) {
super(settings); super(settings);
this.jobProvider = Objects.requireNonNull(jobProvider); this.jobProvider = Objects.requireNonNull(jobProvider);
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.auditor = Objects.requireNonNull(auditor); this.auditor = Objects.requireNonNull(auditor);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.updateJobProcessNotifier = updateJobProcessNotifier; this.updateJobProcessNotifier = updateJobProcessNotifier;
@ -376,7 +379,15 @@ public class JobManager extends AbstractComponent {
* @param modelSnapshot the updated model snapshot object to be stored * @param modelSnapshot the updated model snapshot object to be stored
*/ */
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) { public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
jobResultsPersister.updateModelSnapshot(modelSnapshot, handler, errorHandler); String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
modelSnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
} catch (IOException e) {
errorHandler.accept(e);
}
client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler));
} }
private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {

View File

@ -231,14 +231,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistQuantiles(Quantiles quantiles) { public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(), Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
Quantiles.documentId(quantiles.getJobId())); Quantiles.documentId(quantiles.getJobId()));
if (persistable.persist(AnomalyDetectorsIndex.jobStateIndexName())) { persistable.persist(AnomalyDetectorsIndex.jobStateIndexName());
// Refresh the index when persisting quantiles so that previously
// persisted results will be available for searching. Do this using the
// indices API rather than the index API (used to write the quantiles
// above), because this will refresh all shards rather than just the
// shard that the quantiles document itself was written to.
commitStateWrites(quantiles.getJobId());
}
} }
/** /**
@ -250,17 +243,6 @@ public class JobResultsPersister extends AbstractComponent {
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
} }
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot));
try {
indexRequest.source(toXContentBuilder(modelSnapshot));
} catch (IOException e) {
errorHandler.accept(e);
}
client.index(indexRequest, ActionListener.wrap(r -> handler.accept(true), errorHandler));
}
/** /**
* Persist the memory usage data * Persist the memory usage data
*/ */

View File

@ -108,7 +108,7 @@ public class JobManagerTests extends ESTestCase {
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
Client client = mock(Client.class); Client client = mock(Client.class);
UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class); UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class);
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, client, notifier); return new JobManager(settings, jobProvider, clusterService, auditor, client, notifier);
} }
private ClusterState createClusterState() { private ClusterState createClusterState() {