[ML][7.x] Add lazy assignment job config option (#47993)

This change adds:

- A new option, allow_lazy_open, to anomaly detection jobs
- A new option, allow_lazy_start, to data frame analytics jobs

Both work in the same way: they allow a job to be
opened/started even if no ML node exists that can
accommodate the job immediately. In this situation
the job waits in the opening/starting state until ML
node capacity is available. (The starting state for data
frame analytics jobs is new in this change.)

Additionally, the ML nightly maintenance tasks now
creates audit warnings for ML jobs that are unassigned.
This means that jobs that cannot be assigned to an ML
node for a very long time will show a yellow warning
triangle in the UI.

A final change is that it is now possible to close a job
that is not assigned to a node without using force.
This is because previously jobs that were open but
not assigned to a node were an aberration, whereas
after this change they'll be relatively common.
This commit is contained in:
David Roberts 2019-10-15 06:55:11 +01:00 committed by GitHub
parent 300ddfa3c1
commit 984323783e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 718 additions and 212 deletions

View File

@ -56,6 +56,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
private static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit");
private static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ParseField VERSION = new ParseField("version");
private static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);
@ -86,6 +87,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
},
VERSION,
ValueType.STRING);
PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
}
private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException {
@ -105,11 +107,12 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
private final ByteSizeValue modelMemoryLimit;
private final Instant createTime;
private final Version version;
private final Boolean allowLazyStart;
private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source,
@Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis,
@Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit,
@Nullable Instant createTime, @Nullable Version version) {
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) {
this.id = id;
this.description = description;
this.source = source;
@ -119,6 +122,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
this.modelMemoryLimit = modelMemoryLimit;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());;
this.version = version;
this.allowLazyStart = allowLazyStart;
}
public String getId() {
@ -157,6 +161,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
return version;
}
public Boolean getAllowLazyStart() {
return allowLazyStart;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -190,6 +198,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
if (version != null) {
builder.field(VERSION.getPreferredName(), version);
}
if (allowLazyStart != null) {
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
builder.endObject();
return builder;
}
@ -208,12 +219,13 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
&& Objects.equals(analyzedFields, other.analyzedFields)
&& Objects.equals(modelMemoryLimit, other.modelMemoryLimit)
&& Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version);
&& Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart);
}
@Override
public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version);
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart);
}
@Override
@ -232,6 +244,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
private ByteSizeValue modelMemoryLimit;
private Instant createTime;
private Version version;
private Boolean allowLazyStart;
private Builder() {}
@ -280,9 +293,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
return this;
}
public Builder setAllowLazyStart(Boolean allowLazyStart) {
this.allowLazyStart = allowLazyStart;
return this;
}
public DataFrameAnalyticsConfig build() {
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime,
version);
version, allowLazyStart);
}
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.client.ml.dataframe;
import java.util.Locale;
public enum DataFrameAnalyticsState {
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED;
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, STARTING;
public static DataFrameAnalyticsState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));

View File

@ -67,6 +67,7 @@ public class Job implements ToXContentObject {
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");
public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open");
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);
@ -96,6 +97,7 @@ public class Job implements ToXContentObject {
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
PARSER.declareBoolean(Builder::setDeleting, DELETING);
PARSER.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN);
}
private final String jobId;
@ -117,13 +119,14 @@ public class Job implements ToXContentObject {
private final String modelSnapshotId;
private final String resultsIndexName;
private final Boolean deleting;
private final Boolean allowLazyOpen;
private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName, Boolean deleting) {
String modelSnapshotId, String resultsIndexName, Boolean deleting, Boolean allowLazyOpen) {
this.jobId = jobId;
this.jobType = jobType;
@ -143,6 +146,7 @@ public class Job implements ToXContentObject {
this.modelSnapshotId = modelSnapshotId;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
this.allowLazyOpen = allowLazyOpen;
}
/**
@ -271,6 +275,10 @@ public class Job implements ToXContentObject {
return deleting;
}
public Boolean getAllowLazyOpen() {
return allowLazyOpen;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -326,6 +334,9 @@ public class Job implements ToXContentObject {
if (deleting != null) {
builder.field(DELETING.getPreferredName(), deleting);
}
if (allowLazyOpen != null) {
builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
}
@ -358,7 +369,8 @@ public class Job implements ToXContentObject {
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
&& Objects.equals(this.deleting, that.deleting)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}
@Override
@ -366,7 +378,7 @@ public class Job implements ToXContentObject {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
modelSnapshotId, resultsIndexName, deleting, allowLazyOpen);
}
@Override
@ -398,6 +410,7 @@ public class Job implements ToXContentObject {
private String modelSnapshotId;
private String resultsIndexName;
private Boolean deleting;
private Boolean allowLazyOpen;
private Builder() {
}
@ -425,6 +438,7 @@ public class Job implements ToXContentObject {
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.getDeleting();
this.allowLazyOpen = job.getAllowLazyOpen();
}
public Builder setId(String id) {
@ -521,6 +535,11 @@ public class Job implements ToXContentObject {
return this;
}
Builder setAllowLazyOpen(Boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}
/**
* Builds a job.
*
@ -533,7 +552,7 @@ public class Job implements ToXContentObject {
id, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleting);
modelSnapshotId, resultsIndexName, deleting, allowLazyOpen);
}
}
}

View File

@ -54,6 +54,7 @@ public class JobUpdate implements ToXContentObject {
PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN);
}
private final String jobId;
@ -68,13 +69,14 @@ public class JobUpdate implements ToXContentObject {
private final Long resultsRetentionDays;
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
private final Boolean allowLazyOpen;
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@Nullable AnalysisLimits analysisLimits, @Nullable TimeValue backgroundPersistInterval,
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings) {
@Nullable Map<String, Object> customSettings, @Nullable Boolean allowLazyOpen) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
@ -87,6 +89,7 @@ public class JobUpdate implements ToXContentObject {
this.resultsRetentionDays = resultsRetentionDays;
this.categorizationFilters = categorisationFilters;
this.customSettings = customSettings;
this.allowLazyOpen = allowLazyOpen;
}
public String getJobId() {
@ -137,6 +140,10 @@ public class JobUpdate implements ToXContentObject {
return customSettings;
}
public Boolean getAllowLazyOpen() {
return allowLazyOpen;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -174,6 +181,9 @@ public class JobUpdate implements ToXContentObject {
if (customSettings != null) {
builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
if (allowLazyOpen != null) {
builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
}
@ -201,13 +211,15 @@ public class JobUpdate implements ToXContentObject {
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings);
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}
@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings);
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
allowLazyOpen);
}
public static class DetectorUpdate implements ToXContentObject {
@ -303,6 +315,7 @@ public class JobUpdate implements ToXContentObject {
private Long resultsRetentionDays;
private List<String> categorizationFilters;
private Map<String, Object> customSettings;
private Boolean allowLazyOpen;
/**
* New {@link JobUpdate.Builder} object for the existing job
@ -446,9 +459,15 @@ public class JobUpdate implements ToXContentObject {
return this;
}
public Builder setAllowLazyOpen(boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}
public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings);
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
allowLazyOpen);
}
}
}

View File

@ -66,6 +66,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase<Data
if (randomBoolean()) {
builder.setVersion(Version.CURRENT);
}
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
return builder.build();
}

View File

@ -159,6 +159,9 @@ public class JobTests extends AbstractXContentTestCase<Job> {
if (randomBoolean()) {
builder.setDeleting(randomBoolean());
}
if (randomBoolean()) {
builder.setAllowLazyOpen(randomBoolean());
}
return builder;
}

View File

@ -79,6 +79,9 @@ public class JobUpdateTests extends AbstractXContentTestCase<JobUpdate> {
if (randomBoolean()) {
update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
if (randomBoolean()) {
update.setAllowLazyOpen(randomBoolean());
}
return update.build();
}

View File

@ -122,7 +122,8 @@ The API returns the following results:
"time_format": "epoch_ms"
},
"model_snapshot_retention_days": 1,
"results_index_name": "shared"
"results_index_name": "shared",
"allow_lazy_open": false
}
]
}

View File

@ -95,6 +95,19 @@ so do not set the `background_persist_interval` value too low.
deleted from Elasticsearch. The default value is null, which means results
are retained.
`allow_lazy_open`::
(boolean) Advanced configuration option.
Whether this job should be allowed to open when there is insufficient
{ml} node capacity for it to be immediately assigned to a node.
The default is `false`, which means that the <<ml-open-job>>
will return an error if a {ml} node with capacity to run the
job cannot immediately be found. (However, this is also subject to
the cluster-wide `xpack.ml.max_lazy_ml_nodes` setting - see
<<advanced-ml-settings>>.) If this option is set to `true` then
the <<ml-open-job>> will not return an error, and the job will
wait in the `opening` state until sufficient {ml} node capacity
is available.
[[ml-analysisconfig]]
==== Analysis Configuration Objects

View File

@ -149,7 +149,8 @@ When the job is created, you receive the following results:
"time_format" : "epoch_ms"
},
"model_snapshot_retention_days" : 1,
"results_index_name" : "shared"
"results_index_name" : "shared",
"allow_lazy_open" : false
}
----
// TESTRESPONSE[s/"job_version" : "7.4.0"/"job_version" : $body.job_version/]

View File

@ -74,11 +74,14 @@ See <<ml-job-resource>>. | Yes
|`results_retention_days` |Advanced configuration option. The number of days
for which job results are retained. See <<ml-job-resource>>. | Yes
|`allow_lazy_open` |Advanced configuration option. Whether to allow the job to be
opened when no {ml} node has sufficient capacity. See <<ml-job-resource>>. | Yes
|=======================================================================
For those properties that have `Requires Restart` set to `Yes` in this table,
if the job is open when you make the update, you must stop the data feed, close
the job, then restart the data feed and open the job for the changes to take
the job, then reopen the job and restart the data feed for the changes to take
effect.
[NOTE]
@ -170,7 +173,8 @@ configuration information, including the updated property values. For example:
}
]
},
"results_index_name": "shared"
"results_index_name": "shared",
"allow_lazy_open": false
}
----
// TESTRESPONSE[s/"job_version": "7.0.0-alpha1"/"job_version": $body.job_version/]

View File

@ -138,6 +138,18 @@ that dont contain a results field are not included in the {reganalysis}.
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`allow_lazy_start`::
(Optional, boolean) Whether this job should be allowed to start when there
is insufficient {ml} node capacity for it to be immediately assigned to a node.
The default is `false`, which means that the <<start-dfanalytics>>
will return an error if a {ml} node with capacity to run the
job cannot immediately be found. (However, this is also subject to
the cluster-wide `xpack.ml.max_lazy_ml_nodes` setting - see
<<advanced-ml-settings>>.) If this option is set to `true` then
the <<start-dfanalytics>> will not return an error, and the job will
wait in the `starting` state until sufficient {ml} node capacity
is available.
[[ml-put-dfanalytics-example]]
==== {api-examples-title}
@ -199,7 +211,8 @@ The API returns the following result:
},
"model_memory_limit" : "1gb",
"create_time" : 1562351429434,
"version" : "7.3.0"
"version" : "7.3.0",
"allow_lazy_start" : false
}
----
// TESTRESPONSE[s/1562351429434/$body.$_path/]
@ -259,7 +272,8 @@ The API returns the following result:
},
"model_memory_limit" : "1gb",
"create_time" : 1567168659127,
"version" : "8.0.0"
"version" : "8.0.0",
"allow_lazy_start" : false
}
----
// TESTRESPONSE[s/1567168659127/$body.$_path/]
@ -290,4 +304,4 @@ PUT _ml/data_frame/analytics/student_performance_mathematics_0.3
// TEST[skip:TBD]
<1> The `training_percent` defines the percentage of the data set that will be used
for training the model.
for training the model.

View File

@ -143,12 +143,14 @@ public final class MlTasks {
public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDataFrameAnalyticsTask(analyticsId, tasks);
if (task != null && task.getState() != null) {
if (task != null) {
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
if (taskState == null) {
return DataFrameAnalyticsState.STARTING;
}
return taskState.getState();
} else {
return DataFrameAnalyticsState.STOPPED;
}
return DataFrameAnalyticsState.STOPPED;
}
/**
@ -178,32 +180,29 @@ public final class MlTasks {
* @param nodes The cluster nodes
* @return The job Ids of tasks to do not have an assignment.
*/
public static Set<String> unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
return unallocatedJobTasks(tasks, nodes).stream()
public static Set<String> unassignedJobIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
return unassignedJobTasks(tasks, nodes).stream()
.map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}
/**
* The job tasks that do not have an allocation as determined by
* The job tasks that do not have an assignment as determined by
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return Unallocated job tasks
* @return Unassigned job tasks
*/
public static Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unallocatedJobTasks(
public static Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unassignedJobTasks(
@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
if (tasks == null) {
return Collections.emptyList();
}
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
.collect(Collectors.toList());
return tasks.findTasks(JOB_TASK_NAME, task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes));
}
/**
@ -231,32 +230,29 @@ public final class MlTasks {
* @param nodes The cluster nodes
* @return The job Ids of tasks to do not have an assignment.
*/
public static Set<String> unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
public static Set<String> unassignedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
return unallocatedDatafeedTasks(tasks, nodes).stream()
return unassignedDatafeedTasks(tasks, nodes).stream()
.map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}
/**
* The datafeed tasks that do not have an allocation as determined by
* The datafeed tasks that do not have an assignment as determined by
* {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @param nodes The cluster nodes
* @return Unallocated datafeed tasks
* @return Unassigned datafeed tasks
*/
public static Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unallocatedDatafeedTasks(
public static Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unassignedDatafeedTasks(
@Nullable PersistentTasksCustomMetaData tasks,
DiscoveryNodes nodes) {
if (tasks == null) {
return Collections.emptyList();
}
return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
.stream()
.filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
.collect(Collectors.toList());
return tasks.findTasks(DATAFEED_TASK_NAME, task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes));
}
}

View File

@ -209,6 +209,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
case REINDEXING:
reindexingProgress = legacyProgressPercent;
break;
case STARTING:
case STARTED:
case STOPPED:
case STOPPING:
@ -235,6 +236,14 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
return progress;
}
public DiscoveryNode getNode() {
return node;
}
public String getAssignmentExplanation() {
return assignmentExplanation;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// TODO: Have callers wrap the content with an object as they choose rather than forcing it upon them
@ -297,6 +306,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
case REINDEXING:
targetPhase = "reindexing";
break;
case STARTING:
case STARTED:
case STOPPED:
case STOPPING:

View File

@ -156,13 +156,16 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
private static final ParseField PROGRESS_ON_START = new ParseField("progress_on_start");
@SuppressWarnings("unchecked")
public static ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1], (List<PhaseProgress>) a[2]));
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true,
a -> new TaskParams((String) a[0], (String) a[1], (List<PhaseProgress>) a[2], (Boolean) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION);
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS_ON_START);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DataFrameAnalyticsConfig.ALLOW_LAZY_START);
}
public static TaskParams fromXContent(XContentParser parser) {
@ -172,15 +175,18 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
private final String id;
private final Version version;
private final List<PhaseProgress> progressOnStart;
private final boolean allowLazyStart;
public TaskParams(String id, Version version, List<PhaseProgress> progressOnStart) {
public TaskParams(String id, Version version, List<PhaseProgress> progressOnStart, boolean allowLazyStart) {
this.id = Objects.requireNonNull(id);
this.version = Objects.requireNonNull(version);
this.progressOnStart = Collections.unmodifiableList(progressOnStart);
this.allowLazyStart = allowLazyStart;
}
private TaskParams(String id, String version, @Nullable List<PhaseProgress> progressOnStart) {
this(id, Version.fromString(version), progressOnStart == null ? Collections.emptyList() : progressOnStart);
private TaskParams(String id, String version, @Nullable List<PhaseProgress> progressOnStart, Boolean allowLazyStart) {
this(id, Version.fromString(version), progressOnStart == null ? Collections.emptyList() : progressOnStart,
allowLazyStart != null && allowLazyStart);
}
public TaskParams(StreamInput in) throws IOException {
@ -191,6 +197,11 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
} else {
progressOnStart = Collections.emptyList();
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyStart = in.readBoolean();
} else {
allowLazyStart = false;
}
}
public String getId() {
@ -201,6 +212,10 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
return progressOnStart;
}
public boolean isAllowLazyStart() {
return allowLazyStart;
}
@Override
public String getWriteableName() {
return MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME;
@ -218,6 +233,9 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeList(progressOnStart);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyStart);
}
}
@Override
@ -226,13 +244,14 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version);
builder.field(PROGRESS_ON_START.getPreferredName(), progressOnStart);
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id, version, progressOnStart);
return Objects.hash(id, version, progressOnStart, allowLazyStart);
}
@Override
@ -243,7 +262,8 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
TaskParams other = (TaskParams) o;
return Objects.equals(id, other.id)
&& Objects.equals(version, other.version)
&& Objects.equals(progressOnStart, other.progressOnStart);
&& Objects.equals(progressOnStart, other.progressOnStart)
&& Objects.equals(allowLazyStart, other.allowLazyStart);
}
}

View File

@ -58,6 +58,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
@ -77,6 +78,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
OBJECT_ARRAY_BOOLEAN_OR_STRING);
parser.declareField(Builder::setModelMemoryLimit,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MODEL_MEMORY_LIMIT.getPreferredName()), MODEL_MEMORY_LIMIT, VALUE);
parser.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
if (ignoreUnknownFields) {
// Headers are not parsed by the strict (config) parser, so headers supplied in the _body_ of a REST request will be rejected.
// (For config, headers are explicitly transferred from the auth headers by code in the put data frame actions.)
@ -123,10 +125,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private final Map<String, String> headers;
private final Instant createTime;
private final Version version;
private final boolean allowLazyStart;
public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
FetchSourceContext analyzedFields, Instant createTime, Version version) {
FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) {
this.id = ExceptionsHelper.requireNonNull(id, ID);
this.description = description;
this.source = ExceptionsHelper.requireNonNull(source, SOURCE);
@ -137,6 +140,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
this.headers = Collections.unmodifiableMap(headers);
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.version = version;
this.allowLazyStart = allowLazyStart;
}
public DataFrameAnalyticsConfig(StreamInput in) throws IOException {
@ -159,6 +163,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
createTime = null;
version = null;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyStart = in.readBoolean();
} else {
allowLazyStart = false;
}
}
public String getId() {
@ -201,6 +210,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
return version;
}
public boolean isAllowLazyStart() {
return allowLazyStart;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -231,6 +244,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
if (version != null) {
builder.field(VERSION.getPreferredName(), version);
}
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
builder.endObject();
return builder;
}
@ -256,6 +270,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyStart);
}
}
@Override
@ -273,12 +290,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
&& Objects.equals(getModelMemoryLimit(), other.getModelMemoryLimit())
&& Objects.equals(analyzedFields, other.analyzedFields)
&& Objects.equals(createTime, other.createTime)
&& Objects.equals(version, other.version);
&& Objects.equals(version, other.version)
&& Objects.equals(allowLazyStart, other.allowLazyStart);
}
@Override
public int hashCode() {
return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version);
return Objects.hash(id, description, source, dest, analysis, headers, getModelMemoryLimit(), analyzedFields, createTime, version,
allowLazyStart);
}
@Override
@ -303,6 +322,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private Map<String, String> headers = Collections.emptyMap();
private Instant createTime;
private Version version;
private boolean allowLazyStart;
public Builder() {}
@ -324,6 +344,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
}
this.createTime = config.createTime;
this.version = config.version;
this.allowLazyStart = config.allowLazyStart;
}
public String getId() {
@ -380,13 +401,18 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
return this;
}
public Builder setAllowLazyStart(boolean isLazyStart) {
this.allowLazyStart = isLazyStart;
return this;
}
/**
* Builds {@link DataFrameAnalyticsConfig} object.
*/
public DataFrameAnalyticsConfig build() {
applyMaxModelMemoryLimit();
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, headers, modelMemoryLimit, analyzedFields,
createTime, version);
createTime, version, allowLazyStart);
}
/**
@ -405,7 +431,8 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
modelMemoryLimit,
analyzedFields,
createTime,
version);
version,
allowLazyStart);
}
private void applyMaxModelMemoryLimit() {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -15,7 +16,7 @@ import java.util.Locale;
public enum DataFrameAnalyticsState implements Writeable {
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED;
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED, STARTING;
public static DataFrameAnalyticsState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
@ -27,7 +28,13 @@ public enum DataFrameAnalyticsState implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
DataFrameAnalyticsState toWrite = this;
if (out.getVersion().before(Version.V_7_5_0) && toWrite == STARTING) {
// Before 7.5.0 there was no STARTING state and jobs for which
// tasks existed but were unassigned were considered STOPPED
toWrite = STOPPED;
}
out.writeEnum(toWrite);
}
@Override

View File

@ -77,6 +77,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");
public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("jobs");
@ -127,6 +128,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION);
parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
parser.declareBoolean(Builder::setDeleting, DELETING);
parser.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN);
return parser;
}
@ -159,13 +161,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private final Version modelSnapshotMinVersion;
private final String resultsIndexName;
private final boolean deleting;
private final boolean allowLazyOpen;
private Job(String jobId, String jobType, Version jobVersion, List<String> groups, String description,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting) {
String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting, boolean allowLazyOpen) {
this.jobId = jobId;
this.jobType = jobType;
@ -187,6 +190,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
this.allowLazyOpen = allowLazyOpen;
}
public Job(StreamInput in) throws IOException {
@ -229,6 +233,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
resultsIndexName = in.readString();
deleting = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyOpen = in.readBoolean();
} else {
allowLazyOpen = false;
}
}
/**
@ -398,6 +407,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return deleting;
}
public boolean allowLazyOpen() {
return allowLazyOpen;
}
/**
* Get all input data fields mentioned in the job configuration,
* namely analysis fields and the time field.
@ -495,6 +508,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
out.writeString(resultsIndexName);
out.writeBoolean(deleting);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyOpen);
}
}
@Override
@ -559,6 +575,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
if (deleting) {
builder.field(DELETING.getPreferredName(), deleting);
}
builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
return builder;
}
@ -592,7 +609,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
&& Objects.equals(this.deleting, that.deleting)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}
@Override
@ -600,7 +618,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -648,6 +666,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private Version modelSnapshotMinVersion;
private String resultsIndexName;
private boolean deleting;
private boolean allowLazyOpen;
public Builder() {
}
@ -677,6 +696,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.isDeleting();
this.allowLazyOpen = job.allowLazyOpen();
}
public Builder(StreamInput in) throws IOException {
@ -718,6 +738,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
resultsIndexName = in.readOptionalString();
deleting = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyOpen = in.readBoolean();
}
}
public Builder setId(String id) {
@ -838,6 +861,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return this;
}
public Builder setAllowLazyOpen(boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}
/**
* Return the list of fields that have been set and are invalid to
* be set when the job is created e.g. model snapshot Id should not
@ -912,6 +940,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
out.writeOptionalString(resultsIndexName);
out.writeBoolean(deleting);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(allowLazyOpen);
}
}
@Override
@ -972,6 +1003,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
if (deleting) {
builder.field(DELETING.getPreferredName(), deleting);
}
if (allowLazyOpen) {
builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
@ -1002,7 +1036,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
&& Objects.equals(this.deleting, that.deleting)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}
@Override
@ -1010,7 +1045,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription,
createTime, finishedTime, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
modelSnapshotMinVersion, resultsIndexName, deleting);
modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
}
/**
@ -1137,7 +1172,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
id, jobType, jobVersion, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
}
private void checkValidBackgroundPersistInterval() {

View File

@ -54,6 +54,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
parser.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS);
parser.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN);
}
// These fields should not be set by a REST request
INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
@ -78,6 +79,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final Version modelSnapshotMinVersion;
private final Version jobVersion;
private final Boolean clearJobFinishTime;
private final Boolean allowLazyOpen;
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@ -85,7 +87,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime) {
@Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime,
@Nullable Boolean allowLazyOpen) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
@ -102,6 +105,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.jobVersion = jobVersion;
this.clearJobFinishTime = clearJobFinishTime;
this.allowLazyOpen = allowLazyOpen;
}
public JobUpdate(StreamInput in) throws IOException {
@ -150,6 +154,11 @@ public class JobUpdate implements Writeable, ToXContentObject {
} else {
modelSnapshotMinVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
allowLazyOpen = in.readOptionalBoolean();
} else {
allowLazyOpen = null;
}
}
@Override
@ -199,6 +208,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalBoolean(allowLazyOpen);
}
}
public String getJobId() {
@ -265,6 +277,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
return clearJobFinishTime;
}
public Boolean getAllowLazyOpen() {
return allowLazyOpen;
}
public boolean isAutodetectProcessUpdate() {
return modelPlotConfig != null || detectorUpdates != null || groups != null;
}
@ -318,6 +334,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (clearJobFinishTime != null) {
builder.field(CLEAR_JOB_FINISH_TIME.getPreferredName(), clearJobFinishTime);
}
if (allowLazyOpen != null) {
builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
}
builder.endObject();
return builder;
}
@ -366,6 +385,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (jobVersion != null) {
updateFields.add(Job.JOB_VERSION.getPreferredName());
}
if (allowLazyOpen != null) {
updateFields.add(Job.ALLOW_LAZY_OPEN.getPreferredName());
}
return updateFields;
}
@ -441,10 +463,12 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (jobVersion != null) {
builder.setJobVersion(jobVersion);
}
if (clearJobFinishTime != null && clearJobFinishTime) {
builder.setFinishedTime(null);
}
if (allowLazyOpen != null) {
builder.setAllowLazyOpen(allowLazyOpen);
}
builder.setAnalysisConfig(newAnalysisConfig);
return builder.build();
@ -466,7 +490,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
&& (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId()))
&& (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion()))
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()))
&& ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null);
&& (clearJobFinishTime == null || clearJobFinishTime == false || job.getFinishedTime() == null)
&& (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen()));
}
boolean updatesDetectors(Job job) {
@ -514,14 +539,15 @@ public class JobUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.jobVersion, that.jobVersion)
&& Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime);
&& Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
}
@Override
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime);
modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen);
}
public static class DetectorUpdate implements Writeable, ToXContentObject {
@ -633,6 +659,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private Version modelSnapshotMinVersion;
private Version jobVersion;
private Boolean clearJobFinishTime;
private Boolean allowLazyOpen;
public Builder(String jobId) {
this.jobId = jobId;
@ -723,6 +750,11 @@ public class JobUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setAllowLazyOpen(boolean allowLazyOpen) {
this.allowLazyOpen = allowLazyOpen;
return this;
}
public Builder setClearFinishTime(boolean clearJobFinishTime) {
this.clearJobFinishTime = clearJobFinishTime;
return this;
@ -731,7 +763,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime);
modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen);
}
}
}

View File

@ -136,7 +136,7 @@ public class MlTasksTests extends ESTestCase {
.masterNodeId("node-1")
.build();
assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes),
assertThat(MlTasks.unassignedJobIds(tasksBuilder.build(), nodes),
containsInAnyOrder("job_without_assignment", "job_without_node"));
}
@ -159,7 +159,7 @@ public class MlTasksTests extends ESTestCase {
.masterNodeId("node-1")
.build();
assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes),
assertThat(MlTasks.unassignedDatafeedIds(tasksBuilder.build(), nodes),
containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
}

View File

@ -30,7 +30,7 @@ public class StartDataFrameAnalyticsActionTaskParamsTests extends AbstractSerial
for (int i = 0; i < phaseCount; i++) {
progressOnStart.add(new PhaseProgress(randomAlphaOfLength(10), randomIntBetween(0, 100)));
}
return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT, progressOnStart);
return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT, progressOnStart, randomBoolean());
}
@Override

View File

@ -124,6 +124,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<D
builder.setVersion(Version.CURRENT);
}
}
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
return builder;
}

View File

@ -5,14 +5,23 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DataFrameAnalyticsStateTests extends ESTestCase {
public void testFromString() {
assertThat(DataFrameAnalyticsState.fromString("starting"), equalTo(DataFrameAnalyticsState.STARTING));
assertThat(DataFrameAnalyticsState.fromString("started"), equalTo(DataFrameAnalyticsState.STARTED));
assertThat(DataFrameAnalyticsState.fromString("reindexing"), equalTo(DataFrameAnalyticsState.REINDEXING));
assertThat(DataFrameAnalyticsState.fromString("analyzing"), equalTo(DataFrameAnalyticsState.ANALYZING));
@ -22,6 +31,7 @@ public class DataFrameAnalyticsStateTests extends ESTestCase {
}
public void testToString() {
assertThat(DataFrameAnalyticsState.STARTING.toString(), equalTo("starting"));
assertThat(DataFrameAnalyticsState.STARTED.toString(), equalTo("started"));
assertThat(DataFrameAnalyticsState.REINDEXING.toString(), equalTo("reindexing"));
assertThat(DataFrameAnalyticsState.ANALYZING.toString(), equalTo("analyzing"));
@ -30,6 +40,20 @@ public class DataFrameAnalyticsStateTests extends ESTestCase {
assertThat(DataFrameAnalyticsState.FAILED.toString(), equalTo("failed"));
}
public void testWriteStartingStateToPre75() throws IOException {
StreamOutput streamOutput = mock(StreamOutput.class);
when(streamOutput.getVersion()).thenReturn(Version.V_7_4_1);
DataFrameAnalyticsState.STARTING.writeTo(streamOutput);
verify(streamOutput, times(1)).writeEnum(DataFrameAnalyticsState.STOPPED);
}
public void testWriteStartingStateToPost75() throws IOException {
StreamOutput streamOutput = mock(StreamOutput.class);
when(streamOutput.getVersion()).thenReturn(Version.V_7_5_0);
DataFrameAnalyticsState.STARTING.writeTo(streamOutput);
verify(streamOutput, times(1)).writeEnum(DataFrameAnalyticsState.STARTING);
}
public void testIsAnyOf() {
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(), is(false));
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED), is(true));

View File

@ -109,6 +109,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertNull(job.getResultsRetentionDays());
assertNotNull(job.allInputFields());
assertFalse(job.allInputFields().isEmpty());
assertFalse(job.allowLazyOpen());
}
public void testNoId() {
@ -640,6 +641,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
if (randomBoolean()) {
builder.setResultsIndexName(randomValidJobId());
}
if (randomBoolean()) {
builder.setAllowLazyOpen(randomBoolean());
}
return builder.build();
}
}

View File

@ -96,6 +96,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
if (useInternalParser) {
update.setClearFinishTime(randomBoolean());
}
if (randomBoolean()) {
update.setAllowLazyOpen(randomBoolean());
}
return update.build();
}

View File

@ -48,7 +48,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private String destIndex;
@After
public void cleanup() throws Exception {
public void cleanup() {
cleanUp();
}
@ -83,6 +83,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -119,6 +120,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -170,6 +172,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -213,6 +216,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -289,6 +293,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private static void assertTopClasses(Map<String, Object> resultsObject, int numTopClasses) {
assertThat(resultsObject.containsKey("top_classes"), is(true));
@SuppressWarnings("unchecked")
List<Map<String, Object>> topClasses = (List<Map<String, Object>>) resultsObject.get("top_classes");
assertThat(topClasses, hasSize(numTopClasses));
List<String> classNames = new ArrayList<>(topClasses.size());

View File

@ -42,7 +42,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
private String destIndex;
@After
public void cleanup() throws Exception {
public void cleanup() {
cleanUp();
}
@ -110,6 +110,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -161,6 +162,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
@ -227,6 +229,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",

View File

@ -21,6 +21,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
@ -32,12 +33,14 @@ import java.util.Map;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase {
@ -120,6 +123,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-few-docs-results]",
"Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
@ -174,6 +178,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
@ -254,6 +259,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
@ -312,6 +318,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-stop-outlier-detection-with-enough-docs-to-scroll-results]",
"Stopped analytics");
@ -377,6 +384,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
@ -434,6 +442,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
@ -485,6 +494,60 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
"Estimated memory usage for this analytics to be");
}
public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception {
String sourceIndex = "test-lazy-assign-model-memory-limit-too-high";
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", "col_1", "type=double", "col_2", "type=float", "col_3", "type=keyword")
.get();
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexRequest indexRequest = new IndexRequest(sourceIndex)
.id("doc_1")
.source("col_1", 1.0, "col_2", 1.0, "col_3", "str");
bulkRequestBuilder.add(indexRequest);
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
String id = "test_lazy_assign_model_memory_limit_too_high";
// Assuming a 1TB job will never fit on the test machine - increase this when machines get really big!
ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.TB);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)
.setAllowLazyStart(true)
.build();
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
// Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough
startAnalytics(id);
// Wait until state is STARTING, there is no node but there is an assignment explanation.
assertBusy(() -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id).get(0);
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STARTING));
assertThat(stats.getNode(), is(nullValue()));
assertThat(stats.getAssignmentExplanation(), containsString("persistent task is awaiting node assignment"));
});
stopAnalytics(id);
waitUntilAnalyticsIsStopped(id);
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"No node found to start analytics. Reasons [persistent task is awaiting node assignment.]",
"Started analytics",
"Stopped analytics");
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testOutlierDetectionStopAndRestart() throws Exception {
String sourceIndex = "test-outlier-detection-stop-and-restart";
@ -634,6 +697,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-custom-params-results]",
"Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]",

View File

@ -584,6 +584,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
this.memoryTracker.set(memoryTracker);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
autodetectProcessManager, memoryTracker);
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
new MlConfigMigrator(settings, client, clusterService), clusterService);
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
final InvalidLicenseEnforcer enforcer =
@ -602,12 +604,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
jobManager,
jobManagerHolder,
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client),
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
jobDataCountsPersister,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
mlAssignmentNotifier,
memoryTracker,
analyticsProcessManager,
memoryEstimationProcessManager,

View File

@ -8,20 +8,21 @@ package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.util.Objects;
@ -29,21 +30,15 @@ import java.util.Objects;
public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);
private final AnomalyDetectionAuditor auditor;
private final AnomalyDetectionAuditor anomalyDetectionAuditor;
private final DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;
MlAssignmentNotifier(Settings settings, AnomalyDetectionAuditor auditor, ThreadPool threadPool, Client client,
ClusterService clusterService) {
this.auditor = auditor;
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addListener(this);
}
MlAssignmentNotifier(AnomalyDetectionAuditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator,
ClusterService clusterService) {
this.auditor = auditor;
MlAssignmentNotifier(AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor,
ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
this.anomalyDetectionAuditor = anomalyDetectionAuditor;
this.dataFrameAnalyticsAuditor = dataFrameAnalyticsAuditor;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
clusterService.addListener(this);
@ -75,43 +70,75 @@ public class MlAssignmentNotifier implements ClusterStateListener {
return;
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData currentTasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (Objects.equals(previous, current)) {
if (Objects.equals(previousTasks, currentTasks)) {
return;
}
for (PersistentTask<?> currentTask : current.tasks()) {
auditMlTasks(event.state().nodes(), previousTasks, currentTasks, false);
}
/**
* Creates an audit warning for all currently unassigned ML
* tasks, even if a previous audit warning has been created.
* Care must be taken not to call this method frequently.
*/
public void auditUnassignedMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetaData tasks) {
auditMlTasks(nodes, tasks, tasks, true);
}
private void auditMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetaData previousTasks, PersistentTasksCustomMetaData currentTasks,
boolean alwaysAuditUnassigned) {
for (PersistentTask<?> currentTask : currentTasks.tasks()) {
Assignment currentAssignment = currentTask.getAssignment();
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
PersistentTask<?> previousTask = previousTasks != null ? previousTasks.getTask(currentTask.getId()) : null;
Assignment previousAssignment = previousTask != null ? previousTask.getAssignment() : null;
if (Objects.equals(currentAssignment, previousAssignment)) {
boolean isTaskAssigned = (currentAssignment.getExecutorNode() != null);
if (Objects.equals(currentAssignment, previousAssignment) &&
(isTaskAssigned || alwaysAuditUnassigned == false)) {
continue;
}
if (MlTasks.JOB_TASK_NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.JobParams) currentTask.getParams()).getJobId();
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId, "Opening job on node [" + node.toString() + "]");
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
anomalyDetectionAuditor.warning(jobId,
"No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
String jobId = datafeedParams.getJobId();
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
logger.warn("[{}] {}", jobId, msg);
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
if (jobId != null) {
auditor.warning(jobId, msg);
anomalyDetectionAuditor.info(jobId,
"Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
if (alwaysAuditUnassigned == false) {
logger.warn("[{}] {}", jobId, msg);
}
if (jobId != null) {
anomalyDetectionAuditor.warning(jobId, msg);
}
}
} else if (MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME.equals(currentTask.getTaskName())) {
String id = ((StartDataFrameAnalyticsAction.TaskParams) currentTask.getParams()).getId();
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
dataFrameAnalyticsAuditor.info(id, "Starting analytics on node [" + node.toString() + "]");
} else {
dataFrameAnalyticsAuditor.warning(id,
"No node found to start analytics. Reasons [" + currentAssignment.getExplanation() + "]");
}
}
}

View File

@ -102,7 +102,7 @@ public class MlConfigMigrationEligibilityCheck {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false ||
MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId);
MlTasks.unassignedJobIds(persistentTasks, clusterState.nodes()).contains(jobId);
}
/**
@ -129,6 +129,6 @@ public class MlConfigMigrationEligibilityCheck {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false
|| MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId);
|| MlTasks.unassignedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId);
}
}

View File

@ -296,9 +296,9 @@ public class MlConfigMigrator {
PersistentTasksCustomMetaData currentTasks,
DiscoveryNodes nodes) {
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unallocatedJobTasks = MlTasks.unallocatedJobTasks(currentTasks, nodes);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unallocatedJobTasks = MlTasks.unassignedJobTasks(currentTasks, nodes);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> unallocatedDatafeedsTasks =
MlTasks.unallocatedDatafeedTasks(currentTasks, nodes);
MlTasks.unassignedDatafeedTasks(currentTasks, nodes);
if (unallocatedJobTasks.isEmpty() && unallocatedDatafeedsTasks.isEmpty()) {
return currentTasks;
@ -561,7 +561,7 @@ public class MlConfigMigrator {
public static List<Job> closedOrUnallocatedJobs(ClusterState clusterState) {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
Set<String> openJobIds = MlTasks.openJobIds(persistentTasks);
openJobIds.removeAll(MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()));
openJobIds.removeAll(MlTasks.unassignedJobIds(persistentTasks, clusterState.nodes()));
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
return mlMetadata.getJobs().values().stream()
@ -581,7 +581,7 @@ public class MlConfigMigrator {
public static List<DatafeedConfig> stopppedOrUnallocatedDatafeeds(ClusterState clusterState) {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
Set<String> startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks);
startedDatafeedIds.removeAll(MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()));
startedDatafeedIds.removeAll(MlTasks.unassignedDatafeedIds(persistentTasks, clusterState.nodes()));
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
return mlMetadata.getDatafeeds().values().stream()

View File

@ -10,9 +10,12 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
@ -37,6 +40,8 @@ public class MlDailyMaintenanceService implements Releasable {
private final ThreadPool threadPool;
private final Client client;
private final ClusterService clusterService;
private final MlAssignmentNotifier mlAssignmentNotifier;
/**
* An interface to abstract the calculation of the delay to the next execution.
@ -46,14 +51,18 @@ public class MlDailyMaintenanceService implements Releasable {
private volatile Scheduler.Cancellable cancellable;
MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier<TimeValue> scheduleProvider) {
MlDailyMaintenanceService(ThreadPool threadPool, Client client, ClusterService clusterService,
MlAssignmentNotifier mlAssignmentNotifier, Supplier<TimeValue> scheduleProvider) {
this.threadPool = Objects.requireNonNull(threadPool);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier);
this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
}
public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client) {
this(threadPool, client, () -> delayToNextTime(clusterName));
public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client, ClusterService clusterService,
MlAssignmentNotifier mlAssignmentNotifier) {
this(threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName));
}
/**
@ -113,17 +122,34 @@ public class MlDailyMaintenanceService implements Releasable {
}
private void triggerTasks() {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
try {
LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(),
ActionListener.wrap(
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
scheduleNext();
response -> {
if (response.isDeleted()) {
LOGGER.info("Successfully completed [ML] maintenance tasks");
} else {
LOGGER.info("Halting [ML] maintenance tasks before completion as elapsed time is too great");
}
},
e -> LOGGER.error("An error occurred during maintenance tasks execution", e)));
auditUnassignedMlTasks(clusterService.state());
} finally {
scheduleNext();
}
}
/**
* The idea of this is that if tasks are unassigned for days on end then they'll get a duplicate
* audit warning every day, and that will mean they'll permanently have a yellow triangle next
* to their entries in the UI jobs list. (This functionality may need revisiting if the condition
* for displaying a yellow triangle in the UI jobs list changes.)
*/
private void auditUnassignedMlTasks(ClusterState state) {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
mlAssignmentNotifier.auditUnassignedMlTasks(state.nodes(), tasks);
}
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
class MlInitializationService implements LocalNodeMasterListener, ClusterStateListener {
@ -29,15 +30,18 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client;
private final MlAssignmentNotifier mlAssignmentNotifier;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
private volatile MlDailyMaintenanceService mlDailyMaintenanceService;
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
this.settings = settings;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.client = client;
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
MlAssignmentNotifier mlAssignmentNotifier) {
this.settings = Objects.requireNonNull(settings);
this.threadPool = Objects.requireNonNull(threadPool);
this.clusterService = Objects.requireNonNull(clusterService);
this.client = Objects.requireNonNull(client);
this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier);
clusterService.addListener(this);
clusterService.addLocalNodeMasterListener(this);
}
@ -83,7 +87,8 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi
private synchronized void installDailyMaintenanceService() {
if (mlDailyMaintenanceService == null) {
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);
mlDailyMaintenanceService =
new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client, clusterService, mlAssignmentNotifier);
mlDailyMaintenanceService.start();
clusterService.addLifecycleListener(new LifecycleListener() {
@Override

View File

@ -116,30 +116,43 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
return;
}
if (request.isForce() == false) {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.getOpenJobIds()) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask =
MlTasks.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot close job [" + resolvedJobId + "] because the job does not have "
+ "an assigned node. Use force close to close the job";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
}
if (request.isForce()) {
List<String> jobIdsToForceClose = new ArrayList<>(response.openJobIds);
jobIdsToForceClose.addAll(response.closingJobIds);
forceCloseJob(state, request, jobIdsToForceClose, listener);
} else {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.getOpenJobIds()) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask =
MlTasks.getJobTask(resolvedJobId, tasks);
if (jobTask == null) {
// This should not happen, because openJobIds was
// derived from the same tasks metadata as jobTask
String msg = "Requested job [" + resolvedJobId
+ "] be stopped, but job's task could not be found.";
assert jobTask != null : msg;
logger.error(msg);
} else if (jobTask.isAssigned()) {
executorNodes.add(jobTask.getExecutorNode());
} else {
// This is the easy case - the job is not currently assigned to a node, so can
// be gracefully stopped simply by removing its persistent task. (Usually a
// graceful stop cannot be achieved by simply removing the persistent task, but
// if the job has no running code then graceful/forceful are basically the same.)
// The listener here can be a no-op, as waitForJobClosed() already waits for
// these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(jobTask.getId(),
ActionListener.wrap(
r -> logger.trace("[{}] removed task to close unassigned job", resolvedJobId),
e -> logger.error("[" + resolvedJobId
+ "] failed to remove task to close unassigned job", e)
));
}
}
request.setNodes(executorNodes.toArray(new String[0]));
normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener);
}
},
@ -148,7 +161,6 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
},
listener::onFailure
));
}
}
@ -353,21 +365,20 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
private void sendResponseOrFailure(String jobId,
ActionListener<CloseJobAction.Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
listener.onResponse(new CloseJobAction.Response(true));
return;
}
String msg = "Failed to force close job [" + jobId + "] with ["
+ catchedExceptions.size()
+ caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage)
+ caughtExceptions.stream().map(Exception::getMessage)
.collect(Collectors.joining(", "))
+ "]";
ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
listener.onFailure(e);
}
});

View File

@ -372,7 +372,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
// If the task parameters do not have a job field then the job
// was first opened on a pre v6.6 node and has not been migrated
if (params.getJob() == null) {
Job job = params.getJob();
if (job == null) {
return AWAITING_MIGRATION;
}
@ -401,9 +402,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
}
Job job = params.getJob();
JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, jobId, MlTasks.JOB_TASK_NAME, memoryTracker,
maxLazyMLNodes, node -> nodeFilter(node, job));
job.allowLazyOpen() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, job));
return jobNodeSelector.selectNode(
maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
}
@ -554,6 +554,11 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
}
switch (jobState) {
// The OPENING case here is expected to be incredibly short-lived, just occurring during the
// time period when a job has successfully been assigned to a node but the request to update
// its task state is still in-flight. (The long-lived OPENING case when a lazy node needs to
// be added to the cluster to accommodate the job was dealt with higher up this method when the
// magic AWAITING_LAZY_ASSIGNMENT assignment was checked for.)
case OPENING:
case CLOSED:
return false;

View File

@ -165,7 +165,8 @@ public class TransportStartDataFrameAnalyticsAction
ActionListener<StartContext> memoryUsageHandledListener = ActionListener.wrap(
startContext -> {
StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(
request.getId(), startContext.config.getVersion(), startContext.progressOnStart);
request.getId(), startContext.config.getVersion(), startContext.progressOnStart,
startContext.config.isAllowLazyStart());
persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart);
},
@ -431,6 +432,12 @@ public class TransportStartDataFrameAnalyticsAction
case STOPPING:
exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started");
return true;
// The STARTING case here is expected to be incredibly short-lived, just occurring during the
// time period when a job has successfully been assigned to a node but the request to update
// its task state is still in-flight. (The long-lived STARTING case when a lazy node needs to
// be added to the cluster to accommodate the job was dealt with higher up this method when the
// magic AWAITING_LAZY_ASSIGNMENT assignment was checked for.)
case STARTING:
case STOPPED:
return false;
case FAILED:
@ -547,7 +554,7 @@ public class TransportStartDataFrameAnalyticsAction
}
JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, id, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker,
maxLazyMLNodes, node -> nodeFilter(node, id));
params.isAllowLazyStart() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, id));
// Pass an effectively infinite value for max concurrent opening jobs, because data frame analytics jobs do
// not have an "opening" state so would never be rejected for causing too many jobs in the "opening" state
return jobNodeSelector.selectNode(

View File

@ -140,6 +140,7 @@ public class TransportStopDataFrameAnalyticsAction
Set<String> failedAnalytics) {
for (String analyticsId : analyticsIds) {
switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) {
case STARTING:
case STARTED:
case REINDEXING:
case ANALYZING:

View File

@ -19,6 +19,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.junit.Before;
import java.net.InetAddress;
@ -38,7 +39,8 @@ import static org.mockito.Mockito.when;
public class MlAssignmentNotifierTests extends ESTestCase {
private AnomalyDetectionAuditor auditor;
private AnomalyDetectionAuditor anomalyDetectionAuditor;
private DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor;
private ClusterService clusterService;
private ThreadPool threadPool;
private MlConfigMigrator configMigrator;
@ -46,7 +48,8 @@ public class MlAssignmentNotifierTests extends ESTestCase {
@Before
@SuppressWarnings("unchecked")
private void setupMocks() {
auditor = mock(AnomalyDetectionAuditor.class);
anomalyDetectionAuditor = mock(AnomalyDetectionAuditor.class);
dataFrameAnalyticsAuditor = mock(DataFrameAnalyticsAuditor.class);
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
configMigrator = mock(MlConfigMigrator.class);
@ -67,7 +70,8 @@ public class MlAssignmentNotifierTests extends ESTestCase {
}
public void testClusterChanged_info() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
configMigrator, clusterService);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
@ -86,7 +90,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).info(eq("job_id"), any());
verify(anomalyDetectionAuditor, times(1)).info(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigs(eq(newState), any());
// no longer master
@ -96,11 +100,12 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
verifyNoMoreInteractions(anomalyDetectionAuditor);
}
public void testClusterChanged_warning() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
configMigrator, clusterService);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
@ -119,7 +124,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).warning(eq("job_id"), any());
verify(anomalyDetectionAuditor, times(1)).warning(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigs(eq(newState), any());
// no longer master
@ -130,11 +135,12 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
verifyNoMoreInteractions(anomalyDetectionAuditor);
}
public void testClusterChanged_noPersistentTaskChanges() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
MlAssignmentNotifier notifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
configMigrator, clusterService);
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
@ -154,7 +160,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, times(1)).migrateConfigs(any(), any());
verifyNoMoreInteractions(auditor);
verifyNoMoreInteractions(anomalyDetectionAuditor);
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))

View File

@ -6,7 +6,13 @@
package org.elasticsearch.xpack.ml;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -24,20 +30,29 @@ import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MlDailyManagementServiceTests extends ESTestCase {
public class MlDailyMaintenanceServiceTests extends ESTestCase {
private ThreadPool threadPool;
private Client client;
private ClusterService clusterService;
private MlAssignmentNotifier mlAssignmentNotifier;
@Before
public void setUpTests() {
threadPool = new TestThreadPool("MlDailyManagementServiceTests");
threadPool = new TestThreadPool("MlDailyMaintenanceServiceTests");
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
clusterService = mock(ClusterService.class);
ClusterState state = ClusterState.builder(new ClusterName("MlDailyMaintenanceServiceTests"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build()))
.nodes(DiscoveryNodes.builder().build())
.build();
when(clusterService.state()).thenReturn(state);
mlAssignmentNotifier = mock(MlAssignmentNotifier.class);
}
@After
public void stop() throws InterruptedException {
public void stop() {
terminate(threadPool);
}
@ -50,12 +65,13 @@ public class MlDailyManagementServiceTests extends ESTestCase {
}
verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any());
verify(mlAssignmentNotifier, Mockito.atLeast(triggerCount - 1)).auditUnassignedMlTasks(any(), any());
}
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {
return new MlDailyMaintenanceService(threadPool, client, () -> {
return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> {
latch.countDown();
return TimeValue.timeValueMillis(100);
});
}
}
}

View File

@ -32,6 +32,7 @@ public class MlInitializationServiceTests extends ESTestCase {
private ExecutorService executorService;
private ClusterService clusterService;
private Client client;
private MlAssignmentNotifier mlAssignmentNotifier;
@Before
public void setUpMocks() {
@ -39,6 +40,7 @@ public class MlInitializationServiceTests extends ESTestCase {
executorService = mock(ExecutorService.class);
clusterService = mock(ClusterService.class);
client = mock(Client.class);
mlAssignmentNotifier = mock(MlAssignmentNotifier.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
@ -53,19 +55,22 @@ public class MlInitializationServiceTests extends ESTestCase {
}
public void testInitialize() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
initializationService.onMaster();
assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true));
}
public void testInitialize_noMasterNode() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
initializationService.offMaster();
assertThat(initializationService.getDailyMaintenanceService(), is(nullValue()));
}
public void testInitialize_alreadyInitialized() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
initializationService.onMaster();
@ -74,7 +79,8 @@ public class MlInitializationServiceTests extends ESTestCase {
}
public void testNodeGoesFromMasterToNonMasterAndBack() {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);

View File

@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@ -186,6 +187,35 @@ public class TransportOpenJobActionTests extends ESTestCase {
executor.getAssignment(params, csBuilder.build()).getExplanation());
}
public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() {
Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 0).build();
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(settings,
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addIndices(metaData, routingTable);
csBuilder.metaData(metaData);
csBuilder.routingTable(routingTable.build());
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
Job job = mock(Job.class);
when(job.allowLazyOpen()).thenReturn(true);
OpenJobAction.JobParams params = new OpenJobAction.JobParams("lazy_job");
params.setJob(job);
Assignment assignment = executor.getAssignment(params, csBuilder.build());
assertNotNull(assignment);
assertNull(assignment.getExecutorNode());
assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), assignment.getExplanation());
}
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
addJobTask(jobId, nodeId, jobState, builder, false);
}

View File

@ -27,6 +27,7 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addAnalyticsTask(tasksBuilder, "starting", "foo-node", null);
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING);
addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING);
@ -34,7 +35,7 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED);
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
Set<String> ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false));
@ -59,6 +60,7 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
public void testFindAnalyticsToStop_GivenFailedTaskAndForce() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addAnalyticsTask(tasksBuilder, "starting", "foo-node", null);
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING);
addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING);
@ -66,21 +68,27 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED);
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
Set<String> ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
Set<String> analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true);
assertThat(analyticsToStop, containsInAnyOrder("started", "reindexing", "analyzing", "failed"));
assertThat(analyticsToStop, containsInAnyOrder("starting", "started", "reindexing", "analyzing", "failed"));
}
private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId,
DataFrameAnalyticsState state) {
addAnalyticsTask(builder, analyticsId, nodeId, state, false);
}
private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId,
DataFrameAnalyticsState state, boolean allowLazyStart) {
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(analyticsId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, Collections.emptyList()),
new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, Collections.emptyList(), allowLazyStart),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId),
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null));
if (state != null) {
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId),
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null));
}
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -167,16 +166,9 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped());
// Can't normal stop an unassigned job
// Since 7.5 we can also stop an unassigned job either normally or by force
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet());
assertEquals("Cannot close job [" + jobId +
"] because the job does not have an assigned node. Use force close to close the job",
statusException.getMessage());
// Can only force close an unassigned job
closeJobRequest.setForce(true);
closeJobRequest.setForce(randomBoolean());
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed());
}

View File

@ -611,13 +611,13 @@ public class JobNodeSelectorTests extends ESTestCase {
static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state,
PersistentTasksCustomMetaData.Builder builder) {
addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false);
addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false, false);
}
static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state,
PersistentTasksCustomMetaData.Builder builder, boolean isStale) {
PersistentTasksCustomMetaData.Builder builder, boolean isStale, boolean allowLazyStart) {
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT, Collections.emptyList()),
new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT, Collections.emptyList(), allowLazyStart),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
if (state != null) {
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id),

View File

@ -101,7 +101,7 @@ public class MlMemoryTrackerTests extends ESTestCase {
for (int i = 1; i <= numDataFrameAnalyticsTasks; ++i) {
String id = "analytics" + i;
allIds.add(id);
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestDataFrameAnalyticsTask(id);
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestDataFrameAnalyticsTask(id, false);
tasks.put(task.getId(), task);
}
@ -142,7 +142,7 @@ public class MlMemoryTrackerTests extends ESTestCase {
int numDataFrameAnalyticsTasks = randomIntBetween(2, 5);
for (int i = 1; i <= numDataFrameAnalyticsTasks; ++i) {
String id = "analytics" + i;
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestDataFrameAnalyticsTask(id);
PersistentTasksCustomMetaData.PersistentTask<?> task = makeTestDataFrameAnalyticsTask(id, false);
tasks.put(task.getId(), task);
}
@ -261,9 +261,10 @@ public class MlMemoryTrackerTests extends ESTestCase {
}
private
PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> makeTestDataFrameAnalyticsTask(String id) {
PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>
makeTestDataFrameAnalyticsTask(String id, boolean allowLazyStart) {
return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT,
Collections.emptyList()), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT);
Collections.emptyList(), allowLazyStart), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT);
}
}

View File

@ -113,8 +113,9 @@
- match: { analysis_limits.model_memory_limit: "2048mb" }
---
"Test put job with model_memory_limit as string":
"Test put job with model_memory_limit as string and lazy open":
- skip:
features: headers
- do:
ml.put_job:
job_id: job-model-memory-limit-as-string
@ -126,11 +127,42 @@
"data_description" : {
},
"analysis_limits": {
"model_memory_limit": "3g"
}
"model_memory_limit": "3000g"
},
"allow_lazy_open": true
}
- match: { job_id: "job-model-memory-limit-as-string" }
- match: { analysis_limits.model_memory_limit: "3072mb" }
- match: { analysis_limits.model_memory_limit: "3072000mb" }
# The assumption here is that a 3000GB job will not fit on the test
# node - increase in future if the test ever fails because of this!
# But because the job is allowed to open lazily, opening it shouldn't
# throw an exception - it should wait for a big enough node to be
# added to the cluster.
- do:
ml.open_job:
job_id: job-model-memory-limit-as-string
- match: { opened: false }
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.get_job_stats:
job_id: job-model-memory-limit-as-string
- match: {"jobs.0.state": opening}
# Despite never being assigned to a node the job should close gracefully
- do:
ml.close_job:
job_id: job-model-memory-limit-as-string
- match: { closed: true }
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.get_job_stats:
job_id: job-model-memory-limit-as-string
- match: {"jobs.0.state": closed}
---
"Test get job API with non existing job id":