diff --git a/docs/en/rest-api/ml/get-snapshot.asciidoc b/docs/en/rest-api/ml/get-snapshot.asciidoc index 7ac2a3d765f..b992f5be7df 100644 --- a/docs/en/rest-api/ml/get-snapshot.asciidoc +++ b/docs/en/rest-api/ml/get-snapshot.asciidoc @@ -87,6 +87,7 @@ In this example, the API provides a single result: "model_snapshots": [ { "job_id": "farequote", + "min_version": "6.3.0", "timestamp": 1491948163000, "description": "State persisted due to job close at 2017-04-11T15:02:43-0700", "snapshot_id": "1491948163", diff --git a/docs/en/rest-api/ml/revert-snapshot.asciidoc b/docs/en/rest-api/ml/revert-snapshot.asciidoc index 2e4d8e19604..72b934a56b7 100644 --- a/docs/en/rest-api/ml/revert-snapshot.asciidoc +++ b/docs/en/rest-api/ml/revert-snapshot.asciidoc @@ -102,6 +102,7 @@ When the operation is complete, you receive the following results: { "model": { "job_id": "it_ops_new_kpi", + "min_version": "6.3.0", "timestamp": 1491856080000, "description": "State persisted due to job close at 2017-04-10T13:28:00-0700", "snapshot_id": "1491856080", diff --git a/docs/en/rest-api/ml/snapshotresource.asciidoc b/docs/en/rest-api/ml/snapshotresource.asciidoc index fbcf038f3e0..fb2e3d83de6 100644 --- a/docs/en/rest-api/ml/snapshotresource.asciidoc +++ b/docs/en/rest-api/ml/snapshotresource.asciidoc @@ -23,6 +23,9 @@ A model snapshot resource has the following properties: (string) A numerical character string that uniquely identifies the job that the snapshot was created for. +`min_version`:: + (string) The minimum version required to be able to restore the model snapshot. + `latest_record_time_stamp`:: (date) The timestamp of the latest processed record. diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 39420769f27..3bdb1586990 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -79,6 +79,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField MODEL_SNAPSHOT_RETENTION_DAYS = new ParseField("model_snapshot_retention_days"); public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days"); public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); + public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); public static final ParseField DELETED = new ParseField("deleted"); @@ -143,6 +144,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO parser.declareLong(Builder::setModelSnapshotRetentionDays, MODEL_SNAPSHOT_RETENTION_DAYS); parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT); parser.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); + parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION); parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); parser.declareBoolean(Builder::setDeleted, DELETED); } @@ -175,6 +177,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private final Long resultsRetentionDays; private final Map customSettings; private final String modelSnapshotId; + private final Version modelSnapshotMinVersion; private final String resultsIndexName; private final boolean deleted; @@ -183,7 +186,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, String resultsIndexName, boolean deleted) { + String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleted) { this.jobId = jobId; this.jobType = jobType; @@ -204,6 +207,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.resultsRetentionDays = resultsRetentionDays; this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; + this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.resultsIndexName = resultsIndexName; this.deleted = deleted; } @@ -240,6 +244,11 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO resultsRetentionDays = in.readOptionalLong(); customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) { + modelSnapshotMinVersion = Version.readVersion(in); + } else { + modelSnapshotMinVersion = null; + } resultsIndexName = in.readString(); deleted = in.readBoolean(); } @@ -398,6 +407,10 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return modelSnapshotId; } + public Version getModelSnapshotMinVersion() { + return modelSnapshotMinVersion; + } + public boolean isDeleted() { return deleted; } @@ -507,6 +520,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO out.writeOptionalLong(resultsRetentionDays); out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + if (modelSnapshotMinVersion != null) { + out.writeBoolean(true); + Version.writeVersion(modelSnapshotMinVersion, out); + } else { + out.writeBoolean(false); + } + } out.writeString(resultsIndexName); out.writeBoolean(deleted); } @@ -573,6 +594,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO if (modelSnapshotId != null) { builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId); } + if (modelSnapshotMinVersion != null) { + builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); + } builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); if (params.paramAsBoolean("all", false)) { builder.field(DELETED.getPreferredName(), deleted); @@ -609,6 +633,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) + && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) && Objects.equals(this.deleted, that.deleted); } @@ -618,7 +643,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); } // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -667,6 +692,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private Long resultsRetentionDays; private Map customSettings; private String modelSnapshotId; + private Version modelSnapshotMinVersion; private String resultsIndexName; private boolean deleted; @@ -697,6 +723,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.resultsRetentionDays = job.getResultsRetentionDays(); this.customSettings = job.getCustomSettings(); this.modelSnapshotId = job.getModelSnapshotId(); + this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); this.deleted = job.isDeleted(); } @@ -729,6 +756,11 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO resultsRetentionDays = in.readOptionalLong(); customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) { + modelSnapshotMinVersion = Version.readVersion(in); + } else { + modelSnapshotMinVersion = null; + } resultsIndexName = in.readOptionalString(); deleted = in.readBoolean(); } @@ -841,6 +873,16 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return this; } + public Builder setModelSnapshotMinVersion(Version modelSnapshotMinVersion) { + this.modelSnapshotMinVersion = modelSnapshotMinVersion; + return this; + } + + public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) { + this.modelSnapshotMinVersion = Version.fromString(modelSnapshotMinVersion); + return this; + } + public Builder setResultsIndexName(String resultsIndexName) { this.resultsIndexName = resultsIndexName; return this; @@ -921,6 +963,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO out.writeOptionalLong(resultsRetentionDays); out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + if (modelSnapshotMinVersion != null) { + out.writeBoolean(true); + Version.writeVersion(modelSnapshotMinVersion, out); + } else { + out.writeBoolean(false); + } + } out.writeOptionalString(resultsIndexName); out.writeBoolean(deleted); } @@ -980,6 +1030,9 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO if (modelSnapshotId != null) { builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId); } + if (modelSnapshotMinVersion != null) { + builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); + } if (resultsIndexName != null) { builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); } @@ -1015,6 +1068,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) + && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) && Objects.equals(this.deleted, that.deleted); } @@ -1024,7 +1078,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, - resultsIndexName, deleted); + modelSnapshotMinVersion, resultsIndexName, deleted); } /** @@ -1149,7 +1203,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); } private void checkValidBackgroundPersistInterval() { diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index ccacffffb59..24a1835d4e0 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -48,6 +48,7 @@ public class JobUpdate implements Writeable, ToXContentObject { PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID); + PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION); PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); } @@ -64,6 +65,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final List categorizationFilters; private final Map customSettings; private final String modelSnapshotId; + private final Version modelSnapshotMinVersion; private final Long establishedModelMemory; private JobUpdate(String jobId, @Nullable List groups, @Nullable String description, @@ -72,7 +74,7 @@ public class JobUpdate implements Writeable, ToXContentObject { @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable List categorisationFilters, @Nullable Map customSettings, @Nullable String modelSnapshotId, - @Nullable Long establishedModelMemory) { + @Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory) { this.jobId = jobId; this.groups = groups; this.description = description; @@ -86,6 +88,7 @@ public class JobUpdate implements Writeable, ToXContentObject { this.categorizationFilters = categorisationFilters; this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; + this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.establishedModelMemory = establishedModelMemory; } @@ -116,6 +119,11 @@ public class JobUpdate implements Writeable, ToXContentObject { } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) { + modelSnapshotMinVersion = Version.readVersion(in); + } else { + modelSnapshotMinVersion = null; + } if (in.getVersion().onOrAfter(Version.V_6_1_0)) { establishedModelMemory = in.readOptionalLong(); } else { @@ -147,6 +155,14 @@ public class JobUpdate implements Writeable, ToXContentObject { } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + if (modelSnapshotMinVersion != null) { + out.writeBoolean(true); + Version.writeVersion(modelSnapshotMinVersion, out); + } else { + out.writeBoolean(false); + } + } if (out.getVersion().onOrAfter(Version.V_6_1_0)) { out.writeOptionalLong(establishedModelMemory); } @@ -204,6 +220,10 @@ public class JobUpdate implements Writeable, ToXContentObject { return modelSnapshotId; } + public Version getModelSnapshotMinVersion() { + return modelSnapshotMinVersion; + } + public Long getEstablishedModelMemory() { return establishedModelMemory; } @@ -252,6 +272,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (modelSnapshotId != null) { builder.field(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId); } + if (modelSnapshotMinVersion != null) { + builder.field(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); + } if (establishedModelMemory != null) { builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory); } @@ -297,6 +320,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (modelSnapshotId != null) { updateFields.add(Job.MODEL_SNAPSHOT_ID.getPreferredName()); } + if (modelSnapshotMinVersion != null) { + updateFields.add(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName()); + } if (establishedModelMemory != null) { updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()); } @@ -371,6 +397,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (modelSnapshotId != null) { builder.setModelSnapshotId(modelSnapshotId); } + if (modelSnapshotMinVersion != null) { + builder.setModelSnapshotMinVersion(modelSnapshotMinVersion); + } if (establishedModelMemory != null) { // An established model memory of zero means we don't actually know the established model memory if (establishedModelMemory > 0) { @@ -407,6 +436,7 @@ public class JobUpdate implements Writeable, ToXContentObject { && Objects.equals(this.categorizationFilters, that.categorizationFilters) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) + && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.establishedModelMemory, that.establishedModelMemory); } @@ -414,7 +444,7 @@ public class JobUpdate implements Writeable, ToXContentObject { public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory); + modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -523,6 +553,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private List categorizationFilters; private Map customSettings; private String modelSnapshotId; + private Version modelSnapshotMinVersion; private Long establishedModelMemory; public Builder(String jobId) { @@ -594,6 +625,16 @@ public class JobUpdate implements Writeable, ToXContentObject { return this; } + public Builder setModelSnapshotMinVersion(Version modelSnapshotMinVersion) { + this.modelSnapshotMinVersion = modelSnapshotMinVersion; + return this; + } + + public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) { + this.modelSnapshotMinVersion = Version.fromString(modelSnapshotMinVersion); + return this; + } + public Builder setEstablishedModelMemory(Long establishedModelMemory) { this.establishedModelMemory = establishedModelMemory; return this; @@ -602,7 +643,7 @@ public class JobUpdate implements Writeable, ToXContentObject { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings, - modelSnapshotId, establishedModelMemory); + modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory); } } } diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java index 5481c674da4..3449dab0646 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -44,6 +45,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { public static final ParseField LATEST_RESULT_TIME = new ParseField("latest_result_time_stamp"); public static final ParseField QUANTILES = new ParseField("quantiles"); public static final ParseField RETAIN = new ParseField("retain"); + public static final ParseField MIN_VERSION = new ParseField("min_version"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("model_snapshots"); @@ -57,6 +59,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { static { PARSER.declareString(Builder::setJobId, Job.ID); + PARSER.declareString(Builder::setMinVersion, MIN_VERSION); PARSER.declareField(Builder::setTimestamp, p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); @@ -91,7 +94,15 @@ public class ModelSnapshot implements ToXContentObject, Writeable { PARSER.declareBoolean(Builder::setRetain, RETAIN); } + private final String jobId; + + /** + * The minimum version a node should have to be able + * to read this model snapshot. + */ + private final Version minVersion; + private final Date timestamp; private final String description; private final String snapshotId; @@ -102,10 +113,12 @@ public class ModelSnapshot implements ToXContentObject, Writeable { private final Quantiles quantiles; private final boolean retain; - private ModelSnapshot(String jobId, Date timestamp, String description, String snapshotId, int snapshotDocCount, + + private ModelSnapshot(String jobId, Version minVersion, Date timestamp, String description, String snapshotId, int snapshotDocCount, ModelSizeStats modelSizeStats, Date latestRecordTimeStamp, Date latestResultTimeStamp, Quantiles quantiles, boolean retain) { this.jobId = jobId; + this.minVersion = minVersion; this.timestamp = timestamp; this.description = description; this.snapshotId = snapshotId; @@ -119,6 +132,11 @@ public class ModelSnapshot implements ToXContentObject, Writeable { public ModelSnapshot(StreamInput in) throws IOException { jobId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_3_0)) { + minVersion = Version.readVersion(in); + } else { + minVersion = Version.V_5_5_0; + } timestamp = in.readBoolean() ? new Date(in.readVLong()) : null; description = in.readOptionalString(); snapshotId = in.readOptionalString(); @@ -133,6 +151,9 @@ public class ModelSnapshot implements ToXContentObject, Writeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + Version.writeVersion(minVersion, out); + } if (timestamp != null) { out.writeBoolean(true); out.writeVLong(timestamp.getTime()); @@ -163,6 +184,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); + builder.field(MIN_VERSION.getPreferredName(), minVersion); if (timestamp != null) { builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); } @@ -196,6 +218,10 @@ public class ModelSnapshot implements ToXContentObject, Writeable { return jobId; } + public Version getMinVersion() { + return minVersion; + } + public Date getTimestamp() { return timestamp; } @@ -230,8 +256,8 @@ public class ModelSnapshot implements ToXContentObject, Writeable { @Override public int hashCode() { - return Objects.hash(jobId, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, latestRecordTimeStamp, - latestResultTimeStamp, retain); + return Objects.hash(jobId, minVersion, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, + latestRecordTimeStamp, latestResultTimeStamp, retain); } /** @@ -250,6 +276,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { ModelSnapshot that = (ModelSnapshot) other; return Objects.equals(this.jobId, that.jobId) + && Objects.equals(this.minVersion, that.minVersion) && Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.description, that.description) && Objects.equals(this.snapshotId, that.snapshotId) @@ -320,6 +347,12 @@ public class ModelSnapshot implements ToXContentObject, Writeable { public static class Builder { private String jobId; + + // Stored snapshot documents created prior to 6.3.0 will have no + // value for min_version. We default it to 5.5.0 as there were + // no model changes between 5.5.0 and 6.3.0. + private Version minVersion = Version.V_5_5_0; + private Date timestamp; private String description; private String snapshotId; @@ -330,6 +363,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { private Quantiles quantiles; private boolean retain; + public Builder() { } @@ -349,6 +383,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable { this.latestResultTimeStamp = modelSnapshot.latestResultTimeStamp; this.quantiles = modelSnapshot.quantiles; this.retain = modelSnapshot.retain; + this.minVersion = modelSnapshot.minVersion; } public Builder setJobId(String jobId) { @@ -356,6 +391,16 @@ public class ModelSnapshot implements ToXContentObject, Writeable { return this; } + public Builder setMinVersion(Version minVersion) { + this.minVersion = minVersion; + return this; + } + + public Builder setMinVersion(String minVersion) { + this.minVersion = Version.fromString(minVersion); + return this; + } + public Builder setTimestamp(Date timestamp) { this.timestamp = timestamp; return this; @@ -407,8 +452,8 @@ public class ModelSnapshot implements ToXContentObject, Writeable { } public ModelSnapshot build() { - return new ModelSnapshot(jobId, timestamp, description, snapshotId, snapshotDocCount, modelSizeStats, latestRecordTimeStamp, - latestResultTimeStamp, quantiles, retain); + return new ModelSnapshot(jobId, minVersion, timestamp, description, snapshotId, snapshotDocCount, modelSizeStats, + latestRecordTimeStamp, latestResultTimeStamp, quantiles, retain); } } } diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java index 15d1cc9300c..c1fae72af27 100644 --- a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java @@ -661,6 +661,9 @@ public class JobTests extends AbstractSerializingTestCase { if (randomBoolean()) { builder.setModelSnapshotId(randomAlphaOfLength(10)); } + if (randomBoolean()) { + builder.setModelSnapshotMinVersion(Version.CURRENT); + } if (randomBoolean()) { builder.setResultsIndexName(randomValidJobId()); } diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index d1e7855d989..7a976c89cdb 100644 --- a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.job.config; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -86,6 +87,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase { if (randomBoolean()) { update.setModelSnapshotId(randomAlphaOfLength(10)); } + if (randomBoolean()) { + update.setModelSnapshotMinVersion(Version.CURRENT); + } if (randomBoolean()) { update.setEstablishedModelMemory(randomNonNegativeLong()); } diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java index 91518f8eb28..0ac8a5be5b2 100644 --- a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshotTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; @@ -126,6 +127,7 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase= 5.5.0. + return true; + } + return node.getVersion().onOrAfter(job.getModelSnapshotMinVersion()); + } + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, Logger logger) throws IOException { List indicesToUpdate = new ArrayList<>(); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 108bd32026a..fa50b79e5d5 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -75,7 +75,7 @@ public class AutoDetectResultProcessor { private final boolean restoredSnapshot; final CountDownLatch completionLatch = new CountDownLatch(1); - final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1); + final Semaphore updateModelSnapshotSemaphore = new Semaphore(1); private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; @@ -262,7 +262,7 @@ public class AutoDetectResultProcessor { if (modelSnapshot != null) { // We need to refresh in order for the snapshot to be available when we try to update the job with it persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - updateModelSnapshotIdOnJob(modelSnapshot); + updateModelSnapshotOnJob(modelSnapshot); } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { @@ -293,15 +293,18 @@ public class AutoDetectResultProcessor { } } - protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { - JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); + protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { + JobUpdate update = new JobUpdate.Builder(jobId) + .setModelSnapshotId(modelSnapshot.getSnapshotId()) + .setModelSnapshotMinVersion(modelSnapshot.getMinVersion()) + .build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); try { // This blocks the main processing thread in the unlikely event // there are 2 model snapshots queued up. But it also has the // advantage of ensuring order - updateModelSnapshotIdSemaphore.acquire(); + updateModelSnapshotSemaphore.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId); @@ -311,13 +314,13 @@ public class AutoDetectResultProcessor { executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { @Override public void onResponse(PutJobAction.Response response) { - updateModelSnapshotIdSemaphore.release(); + updateModelSnapshotSemaphore.release(); LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); } @Override public void onFailure(Exception e) { - updateModelSnapshotIdSemaphore.release(); + updateModelSnapshotSemaphore.release(); LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e); } @@ -355,9 +358,9 @@ public class AutoDetectResultProcessor { throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId); } // Input stream has been completely processed at this point. - // Wait for any updateModelSnapshotIdOnJob calls to complete. - updateModelSnapshotIdSemaphore.acquire(); - updateModelSnapshotIdSemaphore.release(); + // Wait for any updateModelSnapshotOnJob calls to complete. + updateModelSnapshotSemaphore.acquire(); + updateModelSnapshotSemaphore.release(); // These lines ensure that the "completion" we're awaiting includes making the results searchable waitUntilRenormalizerIsIdle(); diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index eeb52073b26..ba9fb8775bc 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -41,8 +43,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; @@ -355,6 +355,38 @@ public class TransportOpenJobActionTests extends ESTestCase { assertNull(result.getExecutorNode()); } + public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() { + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true"); + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), + nodeAttr, Collections.emptySet(), Version.V_6_2_0)) + .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), + nodeAttr, Collections.emptySet(), Version.V_6_1_0)) + .build(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_with_incompatible_model_snapshot", "_node_id1", null, tasksBuilder); + PersistentTasksCustomMetaData tasks = tasksBuilder.build(); + + ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId) + .setModelSnapshotId("incompatible_snapshot") + .setModelSnapshotMinVersion(Version.V_6_3_0) + .build(new Date()), "job_with_incompatible_model_snapshot"); + cs.nodes(nodes); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); + cs.metaData(metaData); + cs.routingTable(routingTable.build()); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", cs.build(), + 2, 10, 30, logger); + assertThat(result.getExplanation(), containsString( + "because the job's model snapshot requires a node of version [6.3.0] or higher")); + assertNull(result.getExecutorNode()); + } + public void testVerifyIndicesPrimaryShardsAreActive() { MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index c3b5b27aa81..a6eb3355f9c 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -102,7 +102,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { @Override - protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { + protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); } }; diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 29faa1d9c40..2d44bd86127 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; @@ -314,13 +315,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase { context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) - .setSnapshotId("a_snapshot_id").build(); + .setSnapshotId("a_snapshot_id") + .setMinVersion(Version.CURRENT) + .build(); when(result.getModelSnapshot()).thenReturn(modelSnapshot); processorUnderTest.processResult(context, result); verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, - new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); + new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").setModelSnapshotMinVersion(Version.CURRENT).build()); verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); verifyNoMoreInteractions(persister); @@ -378,7 +381,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); + assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits()); } public void testPersisterThrowingDoesntBlockProcessing() { @@ -433,7 +436,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits()); + assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits()); verify(persister, times(1)).commitResultWrites(JOB_ID); verify(persister, times(1)).commitStateWrites(JOB_ID);