From eaf672f6121c4dc2a48f3524c80fa18d39dc9d2e Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 21 May 2018 14:29:45 +0100 Subject: [PATCH] [ML] Don't install empty ML metadata on startup (#30751) This change is to support rolling upgrade from a pre-6.3 default distribution (i.e. without X-Pack) to a 6.3+ default distribution (i.e. with X-Pack). The ML metadata is no longer eagerly added to the cluster state as soon as the master node has X-Pack available. Instead, it is added when the first ML job is created. As a result all methods that get the ML metadata need to be able to handle the situation where there is no ML metadata in the current cluster state. They do this by behaving as though an empty ML metadata was present. This logic is encapsulated by always asking for the current ML metadata using a static method on the MlMetadata class. Relates #30731 --- .../xpack/core/ml/MlMetadata.java | 9 +++ .../persistence/AnomalyDetectorsIndex.java | 4 +- .../xpack/ml/MachineLearningFeatureSet.java | 11 +-- .../xpack/ml/MlAssignmentNotifier.java | 4 +- .../xpack/ml/MlInitializationService.java | 42 ----------- .../ml/action/TransportCloseJobAction.java | 6 +- .../action/TransportDeleteDatafeedAction.java | 4 +- .../action/TransportDeleteFilterAction.java | 4 +- .../ml/action/TransportDeleteJobAction.java | 11 +-- .../TransportFinalizeJobExecutionAction.java | 4 +- .../TransportGetCalendarEventsAction.java | 3 +- .../action/TransportGetDatafeedsAction.java | 6 +- .../TransportGetDatafeedsStatsAction.java | 7 +- .../action/TransportGetJobsStatsAction.java | 4 +- .../ml/action/TransportOpenJobAction.java | 9 +-- .../TransportPreviewDatafeedAction.java | 3 +- .../ml/action/TransportPutCalendarAction.java | 10 +-- .../ml/action/TransportPutDatafeedAction.java | 2 +- .../action/TransportStartDatafeedAction.java | 5 +- .../action/TransportStopDatafeedAction.java | 2 +- .../TransportUpdateCalendarJobAction.java | 4 +- .../action/TransportUpdateDatafeedAction.java | 4 +- .../xpack/ml/datafeed/DatafeedManager.java | 6 +- .../ml/datafeed/DatafeedNodeSelector.java | 3 +- .../xpack/ml/job/JobManager.java | 22 ++---- .../AbstractExpiredJobDataRemover.java | 7 +- .../ml/job/retention/UnusedStateRemover.java | 10 +-- .../ml/MachineLearningFeatureSetTests.java | 25 ++++++- .../ml/MlInitializationServiceTests.java | 73 +------------------ .../ml/datafeed/DatafeedManagerTests.java | 30 ++++---- .../xpack/ml/integration/DeleteJobIT.java | 4 +- .../xpack/ml/job/JobManagerTests.java | 2 +- .../ml/job/persistence/JobProviderTests.java | 8 +- .../xpack/ml/support/BaseMlIntegTestCase.java | 7 +- 34 files changed, 99 insertions(+), 256 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index b709e32946e..6af323f1510 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; @@ -467,6 +468,14 @@ public class MlMetadata implements MetaData.Custom { } } + public static MlMetadata getMlMetadata(ClusterState state) { + MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE); + if (mlMetadata == null) { + return EMPTY_METADATA; + } + return mlMetadata; + } + public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 1ac842e8898..4e51d7b6c1e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.ml.job.persistence; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; /** @@ -47,8 +46,7 @@ public final class AnomalyDetectorsIndex { * @return The index name */ public static String getPhysicalIndexFromState(ClusterState state, String jobId) { - MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE); - return meta.getJobs().get(jobId).getResultsIndexName(); + return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName(); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index f533fc1aa5a..05af1ffee17 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; @@ -132,15 +131,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { @Override public void usage(ActionListener listener) { ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE); - - // Handle case when usage is called but MlMetadata has not been installed yet - if (mlMetadata == null) { - listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled, - Collections.emptyMap(), Collections.emptyMap())); - } else { - new Retriever(client, mlMetadata, available(), enabled()).execute(listener); - } + new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener); } public static class Retriever { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 87cf03caa99..37d714d1777 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -90,8 +89,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt } } else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) { String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); - MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE); - DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId); + DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); if (currentAssignment.getExecutorNode() == null) { String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" + currentAssignment.getExplanation() + "]"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 5dba8ce943c..c96a12ffa10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -7,20 +7,13 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlMetadata; - -import java.util.concurrent.atomic.AtomicBoolean; class MlInitializationService extends AbstractComponent implements ClusterStateListener { @@ -28,8 +21,6 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL private final ClusterService clusterService; private final Client client; - private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false); - private volatile MlDailyMaintenanceService mlDailyMaintenanceService; MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) { @@ -48,45 +39,12 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL } if (event.localNodeMaster()) { - MetaData metaData = event.state().metaData(); - installMlMetadata(metaData); installDailyMaintenanceService(); } else { uninstallDailyMaintenanceService(); } } - private void installMlMetadata(MetaData metaData) { - if (metaData.custom(MLMetadataField.TYPE) == null) { - if (installMlMetadataCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> - clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // If the metadata has been added already don't try to update - if (currentState.metaData().custom(MLMetadataField.TYPE) != null) { - return currentState; - } - ClusterState.Builder builder = new ClusterState.Builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA); - builder.metaData(metadataBuilder.build()); - return builder.build(); - } - - @Override - public void onFailure(String source, Exception e) { - installMlMetadataCheck.set(false); - logger.error("unable to install ml metadata", e); - } - }) - ); - } - } else { - installMlMetadataCheck.set(false); - } - } - private void installDailyMaintenanceService() { if (mlDailyMaintenanceService == null) { mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 7d113a838dd..fa649e54196 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; @@ -92,8 +91,7 @@ public class TransportCloseJobAction extends TransportTasksAction openJobIds, List closingJobIds) { PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE); - final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull; + final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); List failedJobs = new ArrayList<>(); @@ -107,7 +105,7 @@ public class TransportCloseJobAction extends TransportTasksAction expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()); - expandedJobIds.stream().forEach(jobIdProcessor::accept); + expandedJobIds.forEach(jobIdProcessor::accept); if (request.isForce() == false && failedJobs.size() > 0) { if (expandedJobIds.size() == 1) { throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close", diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 79995af9e62..220d97e89ba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -119,8 +119,8 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction jobs = currentMlMetadata.getJobs(); + Map jobs = MlMetadata.getMlMetadata(state).getJobs(); List currentlyUsedBy = new ArrayList<>(); for (Job job : jobs.values()) { List detectors = job.getAnalysisConfig().getDetectors(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index b664487c77d..90821b302fc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -200,10 +200,9 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener, boolean force) { clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); builder.markJobAsDeleted(jobId, tasks, force); return buildNewClusterState(currentState, builder); } @@ -248,11 +247,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction jobGroups; String requestId = request.getJobId(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index c94449e8c36..b4e3851eda8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -52,10 +51,7 @@ public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction listener) throws Exception { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); List datafeedConfigs = new ArrayList<>(); for (String expandedDatafeedId : expandedDatafeedIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 0d5b2b20209..41c8379c39f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -56,11 +55,7 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct ActionListener listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } - + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index f8f8fffa5b7..78bfe2c7bc6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -69,8 +68,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction listener) { - MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MLMetadataField.TYPE); - MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata; + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()))); ActionListener finalListener = listener; listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 5ae84129254..5e829c72756 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -163,7 +162,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion()); if (compatibleJobTypes.contains(job.getJobType()) == false) { @@ -474,8 +473,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction missingMappingsListener = ActionListener.wrap( response -> { - MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE); - Job job = mlMetadata.getJobs().get(jobParams.getJobId()); + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobParams.getJobId()); if (job != null) { Version jobVersion = job.getJobVersion(); Long jobEstablishedModelMemory = job.getEstablishedModelMemory(); @@ -650,8 +648,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { - MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java index 11e8fbf912c..1393d663fb2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java @@ -14,9 +14,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; @@ -26,16 +24,12 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Collections; -import java.util.List; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -43,17 +37,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportPutCalendarAction extends HandledTransportAction { private final Client client; - private final ClusterService clusterService; @Inject public TransportPutCalendarAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, ClusterService clusterService) { + Client client) { super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutCalendarAction.Request::new); this.client = client; - this.clusterService = clusterService; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 0c492a0817c..2b4304a205b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -141,7 +141,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction(listener, StopDatafeedAction.Response::new)); } } else { - MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); List startedDatafeeds = new ArrayList<>(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java index 5df7e890a8f..0524cb28a0c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; @@ -48,8 +47,7 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction listener) { ClusterState clusterState = clusterService.state(); - MlMetadata maybeNullMetaData = clusterState.getMetaData().custom(MLMetadataField.TYPE); - final MlMetadata mlMetadata = maybeNullMetaData == null ? MlMetadata.EMPTY_METADATA : maybeNullMetaData; + final MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); Set jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression()); Set jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index e50adfa8275..4d752fe2940 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -63,9 +63,9 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction taskHandler) { String datafeedId = task.getDatafeedId(); ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index e52906a605d..37f9715d094 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -33,7 +32,7 @@ public class DatafeedNodeSelector { private final IndexNameExpressionResolver resolver; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) { - MlMetadata mlMetadata = Objects.requireNonNull(clusterState.metaData().custom(MLMetadataField.TYPE)); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeed = mlMetadata.getDatafeed(datafeedId); this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 0f6a0f44cbf..2d67e64ec60 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -133,8 +133,7 @@ public class JobManager extends AbstractComponent { * @throws ResourceNotFoundException if no job matches {@code jobId} */ public static Job getJobOrThrowIfUnknown(String jobId, ClusterState clusterState) { - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId); + Job job = MlMetadata.getMlMetadata(clusterState).getJobs().get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } @@ -142,11 +141,7 @@ public class JobManager extends AbstractComponent { } private Set expandJobIds(String expression, boolean allowNoJobs, ClusterState clusterState) { - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata == null) { - mlMetadata = MlMetadata.EMPTY_METADATA; - } - return mlMetadata.expandJobIds(expression, allowNoJobs); + return MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); } /** @@ -160,7 +155,7 @@ public class JobManager extends AbstractComponent { */ public QueryPage expandJobs(String expression, boolean allowNoJobs, ClusterState clusterState) { Set expandedJobIds = expandJobIds(expression, allowNoJobs, clusterState); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); List jobs = new ArrayList<>(); for (String expandedJobId : expandedJobIds) { jobs.add(mlMetadata.getJobs().get(expandedJobId)); @@ -188,8 +183,8 @@ public class JobManager extends AbstractComponent { DEPRECATION_LOGGER.deprecated("Creating jobs with delimited data format is deprecated. Please use xcontent instead."); } - MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE); - if (currentMlMetadata != null && currentMlMetadata.getJobs().containsKey(job.getId())) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); + if (currentMlMetadata.getJobs().containsKey(job.getId())) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); return; } @@ -469,8 +464,8 @@ public class JobManager extends AbstractComponent { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); if (currentMlMetadata.getJobs().containsKey(jobId) == false) { // We wouldn't have got here if the job never existed so // the Job must have been deleted by another action. @@ -560,8 +555,7 @@ public class JobManager extends AbstractComponent { } private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { - MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); - return new MlMetadata.Builder(currentMlMetadata); + return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); } private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index aa003d29559..8364e015a34 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -11,7 +11,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -61,12 +60,8 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } private Iterator newJobIterator() { - List jobs = new ArrayList<>(); ClusterState clusterState = clusterService.state(); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata != null) { - jobs.addAll(mlMetadata.getJobs().values()); - } + List jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values()); return createVolatileCursorIterator(jobs); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index b07b025e09e..fd4085d2020 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -11,10 +11,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -24,7 +22,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; import java.util.Arrays; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Objects; @@ -84,12 +81,7 @@ public class UnusedStateRemover implements MlDataRemover { } private Set getJobIds() { - ClusterState clusterState = clusterService.state(); - MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); - if (mlMetadata != null) { - return mlMetadata.getJobs().keySet(); - } - return Collections.emptySet(); + return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet(); } private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index b685e6b961d..eba2054054c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -65,7 +65,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase { private XPackLicenseState licenseState; @Before - public void init() throws Exception { + public void init() { commonSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) @@ -232,9 +232,28 @@ public class MachineLearningFeatureSetTests extends ESTestCase { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { usage.toXContent(builder, ToXContent.EMPTY_PARAMS); source = new XContentSource(builder); - assertThat(source.getValue("jobs"), equalTo(Collections.emptyMap())); - assertThat(source.getValue("datafeeds"), equalTo(Collections.emptyMap())); } + + assertThat(source.getValue("jobs._all.count"), equalTo(0)); + assertThat(source.getValue("jobs._all.detectors.min"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.max"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.total"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.detectors.avg"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.min"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.max"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.total"), equalTo(0.0)); + assertThat(source.getValue("jobs._all.model_size.avg"), equalTo(0.0)); + + assertThat(source.getValue("jobs.opening"), is(nullValue())); + assertThat(source.getValue("jobs.opened"), is(nullValue())); + assertThat(source.getValue("jobs.closing"), is(nullValue())); + assertThat(source.getValue("jobs.closed"), is(nullValue())); + assertThat(source.getValue("jobs.failed"), is(nullValue())); + + assertThat(source.getValue("datafeeds._all.count"), equalTo(0)); + + assertThat(source.getValue("datafeeds.started"), is(nullValue())); + assertThat(source.getValue("datafeeds.stopped"), is(nullValue())); } private void givenJobs(List jobs, List jobsStats) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index ce46139a18b..d324d17f40e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -22,20 +21,15 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.junit.Before; -import org.mockito.Mockito; import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.mock.orig.Mockito.doAnswer; -import static org.elasticsearch.mock.orig.Mockito.times; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -68,7 +62,7 @@ public class MlInitializationServiceTests extends ESTestCase { when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); } - public void testInitialize() throws Exception { + public void testInitialize() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -80,11 +74,10 @@ public class MlInitializationServiceTests extends ESTestCase { .build(); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); } - public void testInitialize_noMasterNode() throws Exception { + public void testInitialize_noMasterNode() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -94,11 +87,10 @@ public class MlInitializationServiceTests extends ESTestCase { .build(); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); } - public void testInitialize_alreadyInitialized() throws Exception { + public void testInitialize_alreadyInitialized() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -113,67 +105,10 @@ public class MlInitializationServiceTests extends ESTestCase { initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any()); assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService()); } - public void testInitialize_onlyOnce() throws Exception { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); - } - - public void testInitialize_reintialiseAfterFailure() throws Exception { - MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); - - // Fail the first cluster state update - AtomicBoolean onFailureCalled = new AtomicBoolean(false); - Mockito.doAnswer(invocation -> { - ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; - task.onFailure("mock a failure", new IllegalStateException()); - onFailureCalled.set(true); - return null; - }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) - .localNodeId("_node_id") - .masterNodeId("_node_id")) - .metaData(MetaData.builder()) - .build(); - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get()); - verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - // 2nd update succeeds - AtomicReference clusterStateHolder = new AtomicReference<>(); - Mockito.doAnswer(invocation -> { - ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; - clusterStateHolder.set(task.execute(cs)); - return null; - }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); - assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null); - verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - - // 3rd update won't be called as ML Metadata has been installed - initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get())); - verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); - } - - public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { + public void testNodeGoesFromMasterToNonMasterAndBack() { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index a5d3faf3234..bd722ebf8ef 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -251,11 +251,11 @@ public class DatafeedManagerTests extends ESTestCase { } public void testDatafeedTaskWaitsUntilJobIsOpened() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -269,8 +269,8 @@ public class DatafeedManagerTests extends ESTestCase { addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); @@ -280,8 +280,8 @@ public class DatafeedManagerTests extends ESTestCase { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); @@ -294,8 +294,8 @@ public class DatafeedManagerTests extends ESTestCase { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -308,8 +308,8 @@ public class DatafeedManagerTests extends ESTestCase { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build())); @@ -322,8 +322,8 @@ public class DatafeedManagerTests extends ESTestCase { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); when(clusterService.state()).thenReturn(cs.build()); Consumer handler = mockConsumer(); @@ -340,8 +340,8 @@ public class DatafeedManagerTests extends ESTestCase { tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) - .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData() - .custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + .metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state())) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index 778ffe6dfae..357c2bc2325 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -103,7 +103,7 @@ public class DeleteJobIT extends BaseMlIntegTestCase { } private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { - MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); assertNotNull(mlMetadata); MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata); @@ -116,7 +116,7 @@ public class DeleteJobIT extends BaseMlIntegTestCase { } private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MLMetadataField.TYPE)); + MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); ClusterState.Builder newState = ClusterState.builder(currentState); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 667be5d41ea..64b3bfbab45 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -326,7 +326,7 @@ public class JobManagerTests extends ESTestCase { private ClusterState createClusterState() { ClusterState.Builder builder = ClusterState.builder(new ClusterName("_name")); - builder.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA)); + builder.metaData(MetaData.builder()); return builder.build(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index 9fea904a99f..ef87fe392dd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -39,8 +39,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MLMetadataField; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -93,7 +91,7 @@ public class JobProviderTests extends ESTestCase { AtomicReference resultHolder = new AtomicReference<>(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())) + .metaData(MetaData.builder().indices(ImmutableOpenMap.of())) .build(); ClusterService clusterService = mock(ClusterService.class); @@ -157,7 +155,7 @@ public class JobProviderTests extends ESTestCase { .fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), indexMetaData).build(); ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + .metaData(MetaData.builder().indices(indexMap)).build(); ClusterService clusterService = mock(ClusterService.class); @@ -209,7 +207,7 @@ public class JobProviderTests extends ESTestCase { ImmutableOpenMap indexMap = ImmutableOpenMap.builder().build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + .metaData(MetaData.builder().indices(indexMap)).build(); ClusterService clusterService = mock(ClusterService.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 9f3d1c779f8..f330c501b76 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -28,7 +28,6 @@ import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; -import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -272,8 +271,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception { - MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); - MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState()); try { logger.info("Closing all datafeeds (using _all)"); StopDatafeedAction.Response stopResponse = client @@ -312,8 +310,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } public static void deleteAllJobs(Logger logger, Client client) throws Exception { - MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); - MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState()); try { CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);