[ML] Ensure job is not assigned to node that cannot read model_snapshot (elastic/x-pack-elasticsearch#4091)

This adds a minimum compatible version to the model snapshot.
Nodes with a version earlier than that version cannot read
that model snapshot. Thus, such jobs are not assigned to
incompatible nodes.

relates elastic/x-pack-elasticsearch#4077

Original commit: elastic/x-pack-elasticsearch@2ffa6adce0
This commit is contained in:
Dimitris Athanasiou 2018-03-15 17:38:52 +00:00 committed by GitHub
parent 92379ca9af
commit c10b2ea631
14 changed files with 239 additions and 29 deletions

View File

@ -87,6 +87,7 @@ In this example, the API provides a single result:
"model_snapshots": [
{
"job_id": "farequote",
"min_version": "6.3.0",
"timestamp": 1491948163000,
"description": "State persisted due to job close at 2017-04-11T15:02:43-0700",
"snapshot_id": "1491948163",

View File

@ -102,6 +102,7 @@ When the operation is complete, you receive the following results:
{
"model": {
"job_id": "it_ops_new_kpi",
"min_version": "6.3.0",
"timestamp": 1491856080000,
"description": "State persisted due to job close at 2017-04-10T13:28:00-0700",
"snapshot_id": "1491856080",

View File

@ -23,6 +23,9 @@ A model snapshot resource has the following properties:
(string) A numerical character string that uniquely identifies the job that
the snapshot was created for.
`min_version`::
(string) The minimum version required to be able to restore the model snapshot.
`latest_record_time_stamp`::
(date) The timestamp of the latest processed record.

View File

@ -79,6 +79,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public static final ParseField MODEL_SNAPSHOT_RETENTION_DAYS = new ParseField("model_snapshot_retention_days");
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETED = new ParseField("deleted");
@ -143,6 +144,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
parser.declareLong(Builder::setModelSnapshotRetentionDays, MODEL_SNAPSHOT_RETENTION_DAYS);
parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
parser.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION);
parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
parser.declareBoolean(Builder::setDeleted, DELETED);
}
@ -175,6 +177,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private final Long resultsRetentionDays;
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final Version modelSnapshotMinVersion;
private final String resultsIndexName;
private final boolean deleted;
@ -183,7 +186,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName, boolean deleted) {
String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleted) {
this.jobId = jobId;
this.jobType = jobType;
@ -204,6 +207,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.resultsRetentionDays = resultsRetentionDays;
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.resultsIndexName = resultsIndexName;
this.deleted = deleted;
}
@ -240,6 +244,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
resultsRetentionDays = in.readOptionalLong();
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
resultsIndexName = in.readString();
deleted = in.readBoolean();
}
@ -398,6 +407,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return modelSnapshotId;
}
public Version getModelSnapshotMinVersion() {
return modelSnapshotMinVersion;
}
public boolean isDeleted() {
return deleted;
}
@ -507,6 +520,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
out.writeOptionalLong(resultsRetentionDays);
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
if (modelSnapshotMinVersion != null) {
out.writeBoolean(true);
Version.writeVersion(modelSnapshotMinVersion, out);
} else {
out.writeBoolean(false);
}
}
out.writeString(resultsIndexName);
out.writeBoolean(deleted);
}
@ -573,6 +594,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
if (modelSnapshotId != null) {
builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
if (modelSnapshotMinVersion != null) {
builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion);
}
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
if (params.paramAsBoolean("all", false)) {
builder.field(DELETED.getPreferredName(), deleted);
@ -609,6 +633,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleted, that.deleted);
}
@ -618,7 +643,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleted);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -667,6 +692,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private Long resultsRetentionDays;
private Map<String, Object> customSettings;
private String modelSnapshotId;
private Version modelSnapshotMinVersion;
private String resultsIndexName;
private boolean deleted;
@ -697,6 +723,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.resultsRetentionDays = job.getResultsRetentionDays();
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleted = job.isDeleted();
}
@ -729,6 +756,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
resultsRetentionDays = in.readOptionalLong();
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
resultsIndexName = in.readOptionalString();
deleted = in.readBoolean();
}
@ -841,6 +873,16 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return this;
}
public Builder setModelSnapshotMinVersion(Version modelSnapshotMinVersion) {
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
return this;
}
public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) {
this.modelSnapshotMinVersion = Version.fromString(modelSnapshotMinVersion);
return this;
}
public Builder setResultsIndexName(String resultsIndexName) {
this.resultsIndexName = resultsIndexName;
return this;
@ -921,6 +963,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
out.writeOptionalLong(resultsRetentionDays);
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
if (modelSnapshotMinVersion != null) {
out.writeBoolean(true);
Version.writeVersion(modelSnapshotMinVersion, out);
} else {
out.writeBoolean(false);
}
}
out.writeOptionalString(resultsIndexName);
out.writeBoolean(deleted);
}
@ -980,6 +1030,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
if (modelSnapshotId != null) {
builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
if (modelSnapshotMinVersion != null) {
builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion);
}
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
@ -1015,6 +1068,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleted, that.deleted);
}
@ -1024,7 +1078,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
finishedTime, lastDataTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
resultsIndexName, deleted);
modelSnapshotMinVersion, resultsIndexName, deleted);
}
/**
@ -1149,7 +1203,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
id, jobType, jobVersion, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName, deleted);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted);
}
private void checkValidBackgroundPersistInterval() {

View File

@ -48,6 +48,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS);
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
}
@ -64,6 +65,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private final List<String> categorizationFilters;
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final Version modelSnapshotMinVersion;
private final Long establishedModelMemory;
private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
@ -72,7 +74,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
@Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays,
@Nullable Long modelSnapshotRetentionDays, @Nullable List<String> categorisationFilters,
@Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
@Nullable Long establishedModelMemory) {
@Nullable Version modelSnapshotMinVersion, @Nullable Long establishedModelMemory) {
this.jobId = jobId;
this.groups = groups;
this.description = description;
@ -86,6 +88,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
this.categorizationFilters = categorisationFilters;
this.customSettings = customSettings;
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.establishedModelMemory = establishedModelMemory;
}
@ -116,6 +119,11 @@ public class JobUpdate implements Writeable, ToXContentObject {
}
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.readBoolean()) {
modelSnapshotMinVersion = Version.readVersion(in);
} else {
modelSnapshotMinVersion = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
establishedModelMemory = in.readOptionalLong();
} else {
@ -147,6 +155,14 @@ public class JobUpdate implements Writeable, ToXContentObject {
}
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
if (modelSnapshotMinVersion != null) {
out.writeBoolean(true);
Version.writeVersion(modelSnapshotMinVersion, out);
} else {
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeOptionalLong(establishedModelMemory);
}
@ -204,6 +220,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
return modelSnapshotId;
}
public Version getModelSnapshotMinVersion() {
return modelSnapshotMinVersion;
}
public Long getEstablishedModelMemory() {
return establishedModelMemory;
}
@ -252,6 +272,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (modelSnapshotId != null) {
builder.field(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
if (modelSnapshotMinVersion != null) {
builder.field(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion);
}
if (establishedModelMemory != null) {
builder.field(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory);
}
@ -297,6 +320,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (modelSnapshotId != null) {
updateFields.add(Job.MODEL_SNAPSHOT_ID.getPreferredName());
}
if (modelSnapshotMinVersion != null) {
updateFields.add(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName());
}
if (establishedModelMemory != null) {
updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName());
}
@ -371,6 +397,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
if (modelSnapshotId != null) {
builder.setModelSnapshotId(modelSnapshotId);
}
if (modelSnapshotMinVersion != null) {
builder.setModelSnapshotMinVersion(modelSnapshotMinVersion);
}
if (establishedModelMemory != null) {
// An established model memory of zero means we don't actually know the established model memory
if (establishedModelMemory > 0) {
@ -407,6 +436,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.categorizationFilters, that.categorizationFilters)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.establishedModelMemory, that.establishedModelMemory);
}
@ -414,7 +444,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
public int hashCode() {
return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
}
public static class DetectorUpdate implements Writeable, ToXContentObject {
@ -523,6 +553,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
private List<String> categorizationFilters;
private Map<String, Object> customSettings;
private String modelSnapshotId;
private Version modelSnapshotMinVersion;
private Long establishedModelMemory;
public Builder(String jobId) {
@ -594,6 +625,16 @@ public class JobUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setModelSnapshotMinVersion(Version modelSnapshotMinVersion) {
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
return this;
}
public Builder setModelSnapshotMinVersion(String modelSnapshotMinVersion) {
this.modelSnapshotMinVersion = Version.fromString(modelSnapshotMinVersion);
return this;
}
public Builder setEstablishedModelMemory(Long establishedModelMemory) {
this.establishedModelMemory = establishedModelMemory;
return this;
@ -602,7 +643,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
public JobUpdate build() {
return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId, establishedModelMemory);
modelSnapshotId, modelSnapshotMinVersion, establishedModelMemory);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,6 +45,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
public static final ParseField LATEST_RESULT_TIME = new ParseField("latest_result_time_stamp");
public static final ParseField QUANTILES = new ParseField("quantiles");
public static final ParseField RETAIN = new ParseField("retain");
public static final ParseField MIN_VERSION = new ParseField("min_version");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("model_snapshots");
@ -57,6 +59,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
static {
PARSER.declareString(Builder::setJobId, Job.ID);
PARSER.declareString(Builder::setMinVersion, MIN_VERSION);
PARSER.declareField(Builder::setTimestamp, p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
@ -91,7 +94,15 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
PARSER.declareBoolean(Builder::setRetain, RETAIN);
}
private final String jobId;
/**
* The minimum version a node should have to be able
* to read this model snapshot.
*/
private final Version minVersion;
private final Date timestamp;
private final String description;
private final String snapshotId;
@ -102,10 +113,12 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
private final Quantiles quantiles;
private final boolean retain;
private ModelSnapshot(String jobId, Date timestamp, String description, String snapshotId, int snapshotDocCount,
private ModelSnapshot(String jobId, Version minVersion, Date timestamp, String description, String snapshotId, int snapshotDocCount,
ModelSizeStats modelSizeStats, Date latestRecordTimeStamp, Date latestResultTimeStamp, Quantiles quantiles,
boolean retain) {
this.jobId = jobId;
this.minVersion = minVersion;
this.timestamp = timestamp;
this.description = description;
this.snapshotId = snapshotId;
@ -119,6 +132,11 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
public ModelSnapshot(StreamInput in) throws IOException {
jobId = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
minVersion = Version.readVersion(in);
} else {
minVersion = Version.V_5_5_0;
}
timestamp = in.readBoolean() ? new Date(in.readVLong()) : null;
description = in.readOptionalString();
snapshotId = in.readOptionalString();
@ -133,6 +151,9 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
Version.writeVersion(minVersion, out);
}
if (timestamp != null) {
out.writeBoolean(true);
out.writeVLong(timestamp.getTime());
@ -163,6 +184,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(MIN_VERSION.getPreferredName(), minVersion);
if (timestamp != null) {
builder.dateField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
}
@ -196,6 +218,10 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
return jobId;
}
public Version getMinVersion() {
return minVersion;
}
public Date getTimestamp() {
return timestamp;
}
@ -230,8 +256,8 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats, latestRecordTimeStamp,
latestResultTimeStamp, retain);
return Objects.hash(jobId, minVersion, timestamp, description, snapshotId, quantiles, snapshotDocCount, modelSizeStats,
latestRecordTimeStamp, latestResultTimeStamp, retain);
}
/**
@ -250,6 +276,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
ModelSnapshot that = (ModelSnapshot) other;
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.minVersion, that.minVersion)
&& Objects.equals(this.timestamp, that.timestamp)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.snapshotId, that.snapshotId)
@ -320,6 +347,12 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
public static class Builder {
private String jobId;
// Stored snapshot documents created prior to 6.3.0 will have no
// value for min_version. We default it to 5.5.0 as there were
// no model changes between 5.5.0 and 6.3.0.
private Version minVersion = Version.V_5_5_0;
private Date timestamp;
private String description;
private String snapshotId;
@ -330,6 +363,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
private Quantiles quantiles;
private boolean retain;
public Builder() {
}
@ -349,6 +383,7 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
this.latestResultTimeStamp = modelSnapshot.latestResultTimeStamp;
this.quantiles = modelSnapshot.quantiles;
this.retain = modelSnapshot.retain;
this.minVersion = modelSnapshot.minVersion;
}
public Builder setJobId(String jobId) {
@ -356,6 +391,16 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
return this;
}
public Builder setMinVersion(Version minVersion) {
this.minVersion = minVersion;
return this;
}
public Builder setMinVersion(String minVersion) {
this.minVersion = Version.fromString(minVersion);
return this;
}
public Builder setTimestamp(Date timestamp) {
this.timestamp = timestamp;
return this;
@ -407,8 +452,8 @@ public class ModelSnapshot implements ToXContentObject, Writeable {
}
public ModelSnapshot build() {
return new ModelSnapshot(jobId, timestamp, description, snapshotId, snapshotDocCount, modelSizeStats, latestRecordTimeStamp,
latestResultTimeStamp, quantiles, retain);
return new ModelSnapshot(jobId, minVersion, timestamp, description, snapshotId, snapshotDocCount, modelSizeStats,
latestRecordTimeStamp, latestResultTimeStamp, quantiles, retain);
}
}
}

View File

@ -661,6 +661,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
if (randomBoolean()) {
builder.setModelSnapshotId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
builder.setModelSnapshotMinVersion(Version.CURRENT);
}
if (randomBoolean()) {
builder.setResultsIndexName(randomValidJobId());
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.job.config;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -86,6 +87,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
if (randomBoolean()) {
update.setModelSnapshotId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
update.setModelSnapshotMinVersion(Version.CURRENT);
}
if (randomBoolean()) {
update.setEstablishedModelMemory(randomNonNegativeLong());
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@ -126,6 +127,7 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
private static ModelSnapshot.Builder createFullyPopulated() {
ModelSnapshot.Builder modelSnapshot = new ModelSnapshot.Builder();
modelSnapshot.setJobId("foo");
modelSnapshot.setMinVersion(Version.CURRENT);
modelSnapshot.setTimestamp(DEFAULT_TIMESTAMP);
modelSnapshot.setDescription(DEFAULT_DESCRIPTION);
modelSnapshot.setSnapshotId(DEFAULT_ID);
@ -147,6 +149,7 @@ public class ModelSnapshotTests extends AbstractSerializingTestCase<ModelSnapsho
public static ModelSnapshot createRandomized() {
ModelSnapshot.Builder modelSnapshot = new ModelSnapshot.Builder(randomAlphaOfLengthBetween(1, 20));
modelSnapshot.setMinVersion(Version.CURRENT);
modelSnapshot.setTimestamp(new Date(TimeValue.parseTimeValue(randomTimeValue(), "test").millis()));
modelSnapshot.setDescription(randomAlphaOfLengthBetween(1, 20));
modelSnapshot.setSnapshotId(randomAlphaOfLengthBetween(1, 20));

View File

@ -179,6 +179,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
continue;
}
if (nodeSupportsModelSnapshotVersion(node, job) == false) {
String reason = "Not opening job [" + jobId + "] on node [" + node + "], because the job's model snapshot requires " +
"a node of version [" + job.getModelSnapshotMinVersion() + "] or higher";
logger.trace(reason);
reasons.add(reason);
continue;
}
long numberOfAssignedJobs = 0;
int numberOfAllocatingJobs = 0;
long assignedJobMemory = 0;
@ -318,6 +326,15 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
return nodeVersion.onOrAfter(Version.V_5_5_0);
}
private static boolean nodeSupportsModelSnapshotVersion(DiscoveryNode node, Job job) {
if (job.getModelSnapshotId() == null || job.getModelSnapshotMinVersion() == null) {
// There is no snapshot to restore or the min model snapshot version is 5.5.0
// which is OK as we have already checked the node is >= 5.5.0.
return true;
}
return node.getVersion().onOrAfter(job.getModelSnapshotMinVersion());
}
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion,
Logger logger) throws IOException {
List<String> indicesToUpdate = new ArrayList<>();

View File

@ -75,7 +75,7 @@ public class AutoDetectResultProcessor {
private final boolean restoredSnapshot;
final CountDownLatch completionLatch = new CountDownLatch(1);
final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
private final FlushListener flushListener;
private volatile boolean processKilled;
private volatile boolean failed;
@ -262,7 +262,7 @@ public class AutoDetectResultProcessor {
if (modelSnapshot != null) {
// We need to refresh in order for the snapshot to be available when we try to update the job with it
persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
updateModelSnapshotIdOnJob(modelSnapshot);
updateModelSnapshotOnJob(modelSnapshot);
}
Quantiles quantiles = result.getQuantiles();
if (quantiles != null) {
@ -293,15 +293,18 @@ public class AutoDetectResultProcessor {
}
}
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId)
.setModelSnapshotId(modelSnapshot.getSnapshotId())
.setModelSnapshotMinVersion(modelSnapshot.getMinVersion())
.build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
try {
// This blocks the main processing thread in the unlikely event
// there are 2 model snapshots queued up. But it also has the
// advantage of ensuring order
updateModelSnapshotIdSemaphore.acquire();
updateModelSnapshotSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId);
@ -311,13 +314,13 @@ public class AutoDetectResultProcessor {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
updateModelSnapshotIdSemaphore.release();
updateModelSnapshotSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}
@Override
public void onFailure(Exception e) {
updateModelSnapshotIdSemaphore.release();
updateModelSnapshotSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" +
modelSnapshot.getSnapshotId() + "]", e);
}
@ -355,9 +358,9 @@ public class AutoDetectResultProcessor {
throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId);
}
// Input stream has been completely processed at this point.
// Wait for any updateModelSnapshotIdOnJob calls to complete.
updateModelSnapshotIdSemaphore.acquire();
updateModelSnapshotIdSemaphore.release();
// Wait for any updateModelSnapshotOnJob calls to complete.
updateModelSnapshotSemaphore.acquire();
updateModelSnapshotSemaphore.release();
// These lines ensure that the "completion" we're awaiting includes making the results searchable
waitUntilRenormalizerIsIdle();

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -41,8 +43,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
@ -355,6 +355,38 @@ public class TransportOpenJobActionTests extends ESTestCase {
assertNull(result.getExecutorNode());
}
public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() {
Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.ML_ENABLED_NODE_ATTR, "true");
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
nodeAttr, Collections.emptySet(), Version.V_6_2_0))
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
nodeAttr, Collections.emptySet(), Version.V_6_1_0))
.build();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_with_incompatible_model_snapshot", "_node_id1", null, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, jobId -> BaseMlIntegTestCase.createFareQuoteJob(jobId)
.setModelSnapshotId("incompatible_snapshot")
.setModelSnapshotMinVersion(Version.V_6_3_0)
.build(new Date()), "job_with_incompatible_model_snapshot");
cs.nodes(nodes);
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", cs.build(),
2, 10, 30, logger);
assertThat(result.getExplanation(), containsString(
"because the job's model snapshot requires a node of version [6.3.0] or higher"));
assertNull(result.getExecutorNode());
}
public void testVerifyIndicesPrimaryShardsAreActive() {
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();

View File

@ -102,7 +102,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer,
new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
@Override
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
}
};

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
@ -314,13 +315,15 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
.setSnapshotId("a_snapshot_id").build();
.setSnapshotId("a_snapshot_id")
.setMinVersion(Version.CURRENT)
.build();
when(result.getModelSnapshot()).thenReturn(modelSnapshot);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").setModelSnapshotMinVersion(Version.CURRENT).build());
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
verifyNoMoreInteractions(persister);
@ -378,7 +381,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits());
assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
}
public void testPersisterThrowingDoesntBlockProcessing() {
@ -433,7 +436,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
assertEquals(1, processorUnderTest.updateModelSnapshotIdSemaphore.availablePermits());
assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(persister, times(1)).commitStateWrites(JOB_ID);