[ML] Don't install empty ML metadata on startup (#30751)

This change is to support rolling upgrade from a pre-6.3 default
distribution (i.e. without X-Pack) to a 6.3+ default distribution
(i.e. with X-Pack).

The ML metadata is no longer eagerly added to the cluster state
as soon as the master node has X-Pack available.  Instead, it
is added when the first ML job is created.

As a result all methods that get the ML metadata need to be able
to handle the situation where there is no ML metadata in the
current cluster state.  They do this by behaving as though an
empty ML metadata was present.  This logic is encapsulated by
always asking for the current ML metadata using a static method
on the MlMetadata class.

Relates #30731
This commit is contained in:
David Roberts 2018-05-21 14:29:45 +01:00 committed by GitHub
parent e639036ef1
commit eaf672f612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 99 additions and 256 deletions

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
@ -467,6 +468,14 @@ public class MlMetadata implements MetaData.Custom {
}
}
public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
return EMPTY_METADATA;
}
return mlMetadata;
}
public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.core.ml.job.persistence;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
/**
@ -47,8 +46,7 @@ public final class AnomalyDetectorsIndex {
* @return The index name
*/
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE);
return meta.getJobs().get(jobId).getResultsIndexName();
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
}
/**

View File

@ -23,7 +23,6 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
@ -132,15 +131,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);
// Handle case when usage is called but MlMetadata has not been installed yet
if (mlMetadata == null) {
listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled,
Collections.emptyMap(), Collections.emptyMap()));
} else {
new Retriever(client, mlMetadata, available(), enabled()).execute(listener);
}
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener);
}
public static class Retriever {

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@ -90,8 +89,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt
}
} else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
currentAssignment.getExplanation() + "]";

View File

@ -7,20 +7,13 @@ package org.elasticsearch.xpack.ml;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import java.util.concurrent.atomic.AtomicBoolean;
class MlInitializationService extends AbstractComponent implements ClusterStateListener {
@ -28,8 +21,6 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
private final ClusterService clusterService;
private final Client client;
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
private volatile MlDailyMaintenanceService mlDailyMaintenanceService;
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
@ -48,45 +39,12 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
}
if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData();
installMlMetadata(metaData);
installDailyMaintenanceService();
} else {
uninstallDailyMaintenanceService();
}
}
private void installMlMetadata(MetaData metaData) {
if (metaData.custom(MLMetadataField.TYPE) == null) {
if (installMlMetadataCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() ->
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// If the metadata has been added already don't try to update
if (currentState.metaData().custom(MLMetadataField.TYPE) != null) {
return currentState;
}
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA);
builder.metaData(metadataBuilder.build());
return builder.build();
}
@Override
public void onFailure(String source, Exception e) {
installMlMetadataCheck.set(false);
logger.error("unable to install ml metadata", e);
}
})
);
}
} else {
installMlMetadataCheck.set(false);
}
}
private void installDailyMaintenanceService() {
if (mlDailyMaintenanceService == null) {
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
@ -92,8 +91,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List<String> openJobIds,
List<String> closingJobIds) {
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE);
final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull;
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
List<String> failedJobs = new ArrayList<>();
@ -107,7 +105,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
};
Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
expandedJobIds.stream().forEach(jobIdProcessor::accept);
expandedJobIds.forEach(jobIdProcessor::accept);
if (request.isForce() == false && failedJobs.size() > 0) {
if (expandedJobIds.size() == 1) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close",

View File

@ -119,8 +119,8 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)

View File

@ -24,7 +24,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@ -60,8 +59,7 @@ public class TransportDeleteFilterAction extends HandledTransportAction<DeleteFi
final String filterId = request.getFilterId();
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
Map<String, Job> jobs = currentMlMetadata.getJobs();
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
List<String> currentlyUsedBy = new ArrayList<>();
for (Job job : jobs.values()) {
List<Detector> detectors = job.getAnalysisConfig().getDetectors();

View File

@ -200,10 +200,9 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleted(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}
@ -248,11 +247,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}
static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
MlMetadata metadata = clusterState.metaData().custom(MLMetadataField.TYPE);
if (metadata == null) {
return true;
}
return !metadata.getJobs().containsKey(jobId);
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {

View File

@ -56,8 +56,8 @@ public class TransportFinalizeJobExecutionAction extends TransportMasterNodeActi
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
@ -70,7 +69,7 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
if (request.getJobId() != null) {
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);
List<String> jobGroups;
String requestId = request.getJobId();

View File

@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@ -52,10 +51,7 @@ public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<G
ActionListener<GetDatafeedsAction.Response> listener) throws Exception {
logger.debug("Get datafeed '{}'", request.getDatafeedId());
MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
for (String expandedDatafeedId : expandedDatafeedIds) {

View File

@ -18,7 +18,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
@ -56,11 +55,7 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
@ -69,8 +68,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata;
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,

View File

@ -49,7 +49,6 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
@ -163,7 +162,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
continue;
}
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Job job = mlMetadata.getJobs().get(jobId);
Set<String> compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion());
if (compatibleJobTypes.contains(job.getJobType()) == false) {
@ -474,8 +473,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
// Step 3. Update established model memory for pre-6.1 jobs that haven't had it set
ActionListener<Boolean> missingMappingsListener = ActionListener.wrap(
response -> {
MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE);
Job job = mlMetadata.getJobs().get(jobParams.getJobId());
Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobParams.getJobId());
if (job != null) {
Version jobVersion = job.getJobVersion();
Long jobEstablishedModelMemory = job.getEstablishedModelMemory();
@ -650,8 +648,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
public void validate(OpenJobAction.JobParams params, ClusterState clusterState) {
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
MlMetadata mlMetadata = clusterState.metaData().custom(MLMetadataField.TYPE);
TransportOpenJobAction.validate(params.getJobId(), mlMetadata);
TransportOpenJobAction.validate(params.getJobId(), MlMetadata.getMlMetadata(clusterState));
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState,
maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger);
if (assignment.getExecutorNode() == null) {

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
@ -52,7 +51,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
@Override
protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());

View File

@ -14,9 +14,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@ -26,16 +24,12 @@ import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -43,17 +37,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class TransportPutCalendarAction extends HandledTransportAction<PutCalendarAction.Request, PutCalendarAction.Response> {
private final Client client;
private final ClusterService clusterService;
@Inject
public TransportPutCalendarAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, ClusterService clusterService) {
Client client) {
super(settings, PutCalendarAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, PutCalendarAction.Request::new);
this.client = client;
this.clusterService = clusterService;
}
@Override

View File

@ -141,7 +141,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
}
private ClusterState putDatafeed(PutDatafeedAction.Request request, ClusterState clusterState) {
MlMetadata currentMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata currentMetadata = MlMetadata.getMlMetadata(clusterState);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.putDatafeed(request.getDatafeed(), threadPool.getThreadContext()).build();
return ClusterState.builder(clusterState).metaData(

View File

@ -130,7 +130,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
};
// Verify data extractor factory can be created, then start persistent task
MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
validate(params.getDatafeedId(), mlMetadata, tasks);
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
@ -221,9 +221,8 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
@Override
public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.metaData().custom(MLMetadataField.TYPE);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
TransportStartDatafeedAction.validate(params.getDatafeedId(), mlMetadata, tasks);
TransportStartDatafeedAction.validate(params.getDatafeedId(), MlMetadata.getMlMetadata(clusterState), tasks);
new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId()).checkDatafeedTaskCanBeCreated();
}

View File

@ -130,7 +130,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new));
}
} else {
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<String> startedDatafeeds = new ArrayList<>();

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
@ -48,8 +47,7 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
@Override
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
ClusterState clusterState = clusterService.state();
MlMetadata maybeNullMetaData = clusterState.getMetaData().custom(MLMetadataField.TYPE);
final MlMetadata mlMetadata = maybeNullMetaData == null ? MlMetadata.EMPTY_METADATA : maybeNullMetaData;
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Set<String> jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression());
Set<String> jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression());

View File

@ -63,9 +63,9 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
DatafeedUpdate update = request.getUpdate();
MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)

View File

@ -20,7 +20,6 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@ -80,10 +79,7 @@ public class DatafeedManager extends AbstractComponent {
public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer<Exception> taskHandler) {
String datafeedId = task.getDatafeedId();
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
Job job = mlMetadata.getJobs().get(datafeed.getJobId());

View File

@ -12,7 +12,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@ -33,7 +32,7 @@ public class DatafeedNodeSelector {
private final IndexNameExpressionResolver resolver;
public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) {
MlMetadata mlMetadata = Objects.requireNonNull(clusterState.metaData().custom(MLMetadataField.TYPE));
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
this.datafeed = mlMetadata.getDatafeed(datafeedId);
this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);

View File

@ -133,8 +133,7 @@ public class JobManager extends AbstractComponent {
* @throws ResourceNotFoundException if no job matches {@code jobId}
*/
public static Job getJobOrThrowIfUnknown(String jobId, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
Job job = MlMetadata.getMlMetadata(clusterState).getJobs().get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
@ -142,11 +141,7 @@ public class JobManager extends AbstractComponent {
}
private Set<String> expandJobIds(String expression, boolean allowNoJobs, ClusterState clusterState) {
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
return mlMetadata.expandJobIds(expression, allowNoJobs);
return MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs);
}
/**
@ -160,7 +155,7 @@ public class JobManager extends AbstractComponent {
*/
public QueryPage<Job> expandJobs(String expression, boolean allowNoJobs, ClusterState clusterState) {
Set<String> expandedJobIds = expandJobIds(expression, allowNoJobs, clusterState);
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
List<Job> jobs = new ArrayList<>();
for (String expandedJobId : expandedJobIds) {
jobs.add(mlMetadata.getJobs().get(expandedJobId));
@ -188,8 +183,8 @@ public class JobManager extends AbstractComponent {
DEPRECATION_LOGGER.deprecated("Creating jobs with delimited data format is deprecated. Please use xcontent instead.");
}
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (currentMlMetadata != null && currentMlMetadata.getJobs().containsKey(job.getId())) {
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);
if (currentMlMetadata.getJobs().containsKey(job.getId())) {
actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId()));
return;
}
@ -469,8 +464,8 @@ public class JobManager extends AbstractComponent {
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
// We wouldn't have got here if the job never existed so
// the Job must have been deleted by another action.
@ -560,8 +555,7 @@ public class JobManager extends AbstractComponent {
}
private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
return new MlMetadata.Builder(currentMlMetadata);
return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
}
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {

View File

@ -11,7 +11,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -61,12 +60,8 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
}
private Iterator<Job> newJobIterator() {
List<Job> jobs = new ArrayList<>();
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata != null) {
jobs.addAll(mlMetadata.getJobs().values());
}
List<Job> jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values());
return createVolatileCursorIterator(jobs);
}

View File

@ -11,10 +11,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@ -24,7 +22,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
@ -84,12 +81,7 @@ public class UnusedStateRemover implements MlDataRemover {
}
private Set<String> getJobIds() {
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata != null) {
return mlMetadata.getJobs().keySet();
}
return Collections.emptySet();
return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet();
}
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {

View File

@ -65,7 +65,7 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
private XPackLicenseState licenseState;
@Before
public void init() throws Exception {
public void init() {
commonSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false)
@ -232,9 +232,28 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
usage.toXContent(builder, ToXContent.EMPTY_PARAMS);
source = new XContentSource(builder);
assertThat(source.getValue("jobs"), equalTo(Collections.emptyMap()));
assertThat(source.getValue("datafeeds"), equalTo(Collections.emptyMap()));
}
assertThat(source.getValue("jobs._all.count"), equalTo(0));
assertThat(source.getValue("jobs._all.detectors.min"), equalTo(0.0));
assertThat(source.getValue("jobs._all.detectors.max"), equalTo(0.0));
assertThat(source.getValue("jobs._all.detectors.total"), equalTo(0.0));
assertThat(source.getValue("jobs._all.detectors.avg"), equalTo(0.0));
assertThat(source.getValue("jobs._all.model_size.min"), equalTo(0.0));
assertThat(source.getValue("jobs._all.model_size.max"), equalTo(0.0));
assertThat(source.getValue("jobs._all.model_size.total"), equalTo(0.0));
assertThat(source.getValue("jobs._all.model_size.avg"), equalTo(0.0));
assertThat(source.getValue("jobs.opening"), is(nullValue()));
assertThat(source.getValue("jobs.opened"), is(nullValue()));
assertThat(source.getValue("jobs.closing"), is(nullValue()));
assertThat(source.getValue("jobs.closed"), is(nullValue()));
assertThat(source.getValue("jobs.failed"), is(nullValue()));
assertThat(source.getValue("datafeeds._all.count"), equalTo(0));
assertThat(source.getValue("datafeeds.started"), is(nullValue()));
assertThat(source.getValue("datafeeds.stopped"), is(nullValue()));
}
private void givenJobs(List<Job> jobs, List<GetJobsStatsAction.Response.JobStats> jobsStats) {

View File

@ -10,7 +10,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -22,20 +21,15 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.junit.Before;
import org.mockito.Mockito;
import java.net.InetAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -68,7 +62,7 @@ public class MlInitializationServiceTests extends ESTestCase {
when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME);
}
public void testInitialize() throws Exception {
public void testInitialize() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -80,11 +74,10 @@ public class MlInitializationServiceTests extends ESTestCase {
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true));
}
public void testInitialize_noMasterNode() throws Exception {
public void testInitialize_noMasterNode() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -94,11 +87,10 @@ public class MlInitializationServiceTests extends ESTestCase {
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertThat(initializationService.getDailyMaintenanceService(), is(nullValue()));
}
public void testInitialize_alreadyInitialized() throws Exception {
public void testInitialize_alreadyInitialized() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -113,67 +105,10 @@ public class MlInitializationServiceTests extends ESTestCase {
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(0)).submitStateUpdateTask(eq("install-ml-metadata"), any());
assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService());
}
public void testInitialize_onlyOnce() throws Exception {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
}
public void testInitialize_reintialiseAfterFailure() throws Exception {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
// Fail the first cluster state update
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
Mockito.doAnswer(invocation -> {
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
task.onFailure("mock a failure", new IllegalStateException());
onFailureCalled.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get());
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
// 2nd update succeeds
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
Mockito.doAnswer(invocation -> {
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
clusterStateHolder.set(task.execute(cs));
return null;
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
// 3rd update won't be called as ML Metadata has been installed
initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get()));
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
}
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
public void testNodeGoesFromMasterToNonMasterAndBack() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);

View File

@ -254,8 +254,8 @@ public class DatafeedManagerTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
Consumer<Exception> handler = mockConsumer();
@ -269,8 +269,8 @@ public class DatafeedManagerTests extends ESTestCase {
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));
@ -280,8 +280,8 @@ public class DatafeedManagerTests extends ESTestCase {
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));
@ -294,8 +294,8 @@ public class DatafeedManagerTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
Consumer<Exception> handler = mockConsumer();
@ -308,8 +308,8 @@ public class DatafeedManagerTests extends ESTestCase {
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder);
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", updatedCs.build(), cs.build()));
@ -322,8 +322,8 @@ public class DatafeedManagerTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());
Consumer<Exception> handler = mockConsumer();
@ -340,8 +340,8 @@ public class DatafeedManagerTests extends ESTestCase {
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, clusterService.state().getMetaData()
.custom(MLMetadataField.TYPE)).putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
.metaData(new MetaData.Builder().putCustom(MLMetadataField.TYPE, MlMetadata.getMlMetadata(clusterService.state()))
.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs.build(), updatedCs.build()));

View File

@ -103,7 +103,7 @@ public class DeleteJobIT extends BaseMlIntegTestCase {
}
private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) {
MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
assertNotNull(mlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata);
@ -116,7 +116,7 @@ public class DeleteJobIT extends BaseMlIntegTestCase {
}
private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) {
MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MLMetadataField.TYPE));
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
ClusterState.Builder newState = ClusterState.builder(currentState);

View File

@ -326,7 +326,7 @@ public class JobManagerTests extends ESTestCase {
private ClusterState createClusterState() {
ClusterState.Builder builder = ClusterState.builder(new ClusterName("_name"));
builder.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA));
builder.metaData(MetaData.builder());
return builder.build();
}
}

View File

@ -39,8 +39,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -93,7 +91,7 @@ public class JobProviderTests extends ESTestCase {
AtomicReference<Boolean> resultHolder = new AtomicReference<>();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of()))
.metaData(MetaData.builder().indices(ImmutableOpenMap.of()))
.build();
ClusterService clusterService = mock(ClusterService.class);
@ -157,7 +155,7 @@ public class JobProviderTests extends ESTestCase {
.fPut(AnomalyDetectorsIndex.jobResultsAliasedName("foo"), indexMetaData).build();
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
.metaData(MetaData.builder().indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
@ -209,7 +207,7 @@ public class JobProviderTests extends ESTestCase {
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder().build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
.metaData(MetaData.builder().indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
@ -272,8 +271,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
}
public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception {
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
try {
logger.info("Closing all datafeeds (using _all)");
StopDatafeedAction.Response stopResponse = client
@ -312,8 +310,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
}
public static void deleteAllJobs(Logger logger, Client client) throws Exception {
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
MlMetadata mlMetadata = metaData.custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
try {
CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);