[ML] Add job version (elastic/x-pack-elasticsearch#1437)
relates elastic/x-pack-elasticsearch#1396 Original commit: elastic/x-pack-elasticsearch@3148c76d7f
This commit is contained in:
parent
e047598ca9
commit
dda456fb76
|
@ -5,7 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -50,6 +52,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
*/
|
||||
public static final ParseField ID = new ParseField("job_id");
|
||||
public static final ParseField JOB_TYPE = new ParseField("job_type");
|
||||
public static final ParseField JOB_VERSION = new ParseField("job_version");
|
||||
public static final ParseField ANALYSIS_CONFIG = new ParseField("analysis_config");
|
||||
public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits");
|
||||
public static final ParseField CREATE_TIME = new ParseField("create_time");
|
||||
|
@ -80,6 +83,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
static {
|
||||
PARSER.declareString(Builder::setId, ID);
|
||||
PARSER.declareString(Builder::setJobType, JOB_TYPE);
|
||||
PARSER.declareString(Builder::setJobVersion, JOB_VERSION);
|
||||
PARSER.declareStringOrNull(Builder::setDescription, DESCRIPTION);
|
||||
PARSER.declareField(Builder::setCreateTime, p -> {
|
||||
if (p.currentToken() == Token.VALUE_NUMBER) {
|
||||
|
@ -124,6 +128,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
private final String jobId;
|
||||
private final String jobType;
|
||||
|
||||
/**
|
||||
* The version when the job was created.
|
||||
* Will be null for versions before 5.5.
|
||||
*/
|
||||
@Nullable
|
||||
private final Version jobVersion;
|
||||
|
||||
private final String description;
|
||||
// TODO: Use java.time for the Dates here: x-pack-elasticsearch#829
|
||||
private final Date createTime;
|
||||
|
@ -142,7 +154,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
private final String resultsIndexName;
|
||||
private final boolean deleted;
|
||||
|
||||
private Job(String jobId, String jobType, String description, Date createTime,
|
||||
private Job(String jobId, String jobType, Version jobVersion, String description, Date createTime,
|
||||
Date finishedTime, Date lastDataTime,
|
||||
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
|
||||
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
|
||||
|
@ -151,6 +163,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
this.jobId = jobId;
|
||||
this.jobType = jobType;
|
||||
this.jobVersion = jobVersion;
|
||||
this.description = description;
|
||||
this.createTime = createTime;
|
||||
this.finishedTime = finishedTime;
|
||||
|
@ -172,6 +185,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public Job(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
jobType = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
jobVersion = in.readBoolean() ? Version.readVersion(in) : null;
|
||||
} else {
|
||||
jobVersion = null;
|
||||
}
|
||||
description = in.readOptionalString();
|
||||
createTime = new Date(in.readVLong());
|
||||
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
|
@ -203,6 +221,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
return jobType;
|
||||
}
|
||||
|
||||
public Version getJobVersion() {
|
||||
return jobVersion;
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the index storing the job's results and state.
|
||||
* This defaults to {@link #getId()} if a specific index name is not set.
|
||||
|
@ -372,6 +394,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
out.writeString(jobType);
|
||||
if (out.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
if (jobVersion != null) {
|
||||
out.writeBoolean(true);
|
||||
Version.writeVersion(jobVersion, out);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
out.writeOptionalString(description);
|
||||
out.writeVLong(createTime.getTime());
|
||||
if (finishedTime != null) {
|
||||
|
@ -413,6 +443,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
builder.field(ID.getPreferredName(), jobId);
|
||||
builder.field(JOB_TYPE.getPreferredName(), jobType);
|
||||
if (jobVersion != null) {
|
||||
builder.field(JOB_VERSION.getPreferredName(), jobVersion);
|
||||
}
|
||||
if (description != null) {
|
||||
builder.field(DESCRIPTION.getPreferredName(), description);
|
||||
}
|
||||
|
@ -471,7 +504,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
}
|
||||
|
||||
Job that = (Job) other;
|
||||
return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.description, that.description)
|
||||
return Objects.equals(this.jobId, that.jobId)
|
||||
&& Objects.equals(this.jobType, that.jobType)
|
||||
&& Objects.equals(this.jobVersion, that.jobVersion)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.createTime, that.createTime)
|
||||
&& Objects.equals(this.finishedTime, that.finishedTime)
|
||||
&& Objects.equals(this.lastDataTime, that.lastDataTime)
|
||||
|
@ -490,13 +526,13 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, description, createTime, finishedTime, lastDataTime, analysisConfig,
|
||||
return Objects.hash(jobId, jobType, jobVersion, description, createTime, finishedTime, lastDataTime, analysisConfig,
|
||||
analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
|
||||
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
|
||||
modelSnapshotId, resultsIndexName, deleted);
|
||||
}
|
||||
|
||||
// Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
|
||||
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
|
||||
@Override
|
||||
public final String toString() {
|
||||
return Strings.toString(this);
|
||||
|
@ -512,6 +548,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
private String id;
|
||||
private String jobType = ANOMALY_DETECTOR_JOB_TYPE;
|
||||
private Version jobVersion;
|
||||
private String description;
|
||||
private AnalysisConfig analysisConfig;
|
||||
private AnalysisLimits analysisLimits;
|
||||
|
@ -538,6 +575,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
public Builder(Job job) {
|
||||
this.id = job.getId();
|
||||
this.jobType = job.getJobType();
|
||||
this.jobVersion = job.getJobVersion();
|
||||
this.description = job.getDescription();
|
||||
this.analysisConfig = job.getAnalysisConfig();
|
||||
this.analysisLimits = job.getAnalysisLimits();
|
||||
|
@ -559,6 +598,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public Builder(StreamInput in) throws IOException {
|
||||
id = in.readOptionalString();
|
||||
jobType = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
jobVersion = in.readBoolean() ? Version.readVersion(in) : null;
|
||||
}
|
||||
description = in.readOptionalString();
|
||||
createTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
|
||||
|
@ -586,6 +628,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
return id;
|
||||
}
|
||||
|
||||
public void setJobVersion(Version jobVersion) {
|
||||
this.jobVersion = jobVersion;
|
||||
}
|
||||
|
||||
private void setJobVersion(String jobVersion) {
|
||||
this.jobVersion = Version.fromString(jobVersion);
|
||||
}
|
||||
|
||||
private void setJobType(String jobType) {
|
||||
this.jobType = jobType;
|
||||
}
|
||||
|
@ -691,6 +741,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeOptionalString(id);
|
||||
out.writeString(jobType);
|
||||
if (out.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
if (jobVersion != null) {
|
||||
out.writeBoolean(true);
|
||||
Version.writeVersion(jobVersion, out);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
}
|
||||
out.writeOptionalString(description);
|
||||
if (createTime != null) {
|
||||
out.writeBoolean(true);
|
||||
|
@ -731,6 +789,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
builder.field(ID.getPreferredName(), id);
|
||||
}
|
||||
builder.field(JOB_TYPE.getPreferredName(), jobType);
|
||||
if (jobVersion != null) {
|
||||
builder.field(JOB_VERSION.getPreferredName(), jobVersion);
|
||||
}
|
||||
if (description != null) {
|
||||
builder.field(DESCRIPTION.getPreferredName(), description);
|
||||
}
|
||||
|
@ -791,6 +852,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
Job.Builder that = (Job.Builder) o;
|
||||
return Objects.equals(this.id, that.id)
|
||||
&& Objects.equals(this.jobType, that.jobType)
|
||||
&& Objects.equals(this.jobVersion, that.jobVersion)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.analysisConfig, that.analysisConfig)
|
||||
&& Objects.equals(this.analysisLimits, that.analysisLimits)
|
||||
|
@ -811,9 +874,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime,
|
||||
lastDataTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays,
|
||||
resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted);
|
||||
return Objects.hash(id, jobType, jobVersion, description, analysisConfig, analysisLimits, dataDescription, createTime,
|
||||
finishedTime, lastDataTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval,
|
||||
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -851,11 +914,26 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
// Creation time is NOT required in user input, hence validated only on build
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a job with the given {@code createTime} and the current version.
|
||||
* This should be used when a new job is created as opposed to {@link #build()}.
|
||||
*
|
||||
* @param createTime The time this job was created
|
||||
* @return The job
|
||||
*/
|
||||
public Job build(Date createTime) {
|
||||
setCreateTime(createTime);
|
||||
setJobVersion(Version.CURRENT);
|
||||
return build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a job.
|
||||
* This should be used when an existing job is being built
|
||||
* as opposed to {@link #build(Date)}.
|
||||
*
|
||||
* @return The job
|
||||
*/
|
||||
public Job build() {
|
||||
|
||||
validateInputFields();
|
||||
|
@ -874,7 +952,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
|||
}
|
||||
|
||||
return new Job(
|
||||
id, jobType, description, createTime, finishedTime, lastDataTime,
|
||||
id, jobType, jobVersion, description, createTime, finishedTime, lastDataTime,
|
||||
analysisConfig, analysisLimits,
|
||||
dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval,
|
||||
modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
|
||||
|
|
|
@ -5,11 +5,13 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
||||
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
|
||||
|
@ -55,11 +57,11 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
|
||||
Job.Builder job = createScheduledJob("lookback-job");
|
||||
registerJob(job);
|
||||
assertTrue(putJob(job).isAcknowledged());
|
||||
PutJobAction.Response putJobResponse = putJob(job);
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
|
||||
openJob(job.getId());
|
||||
assertBusy(() -> {
|
||||
GetJobsStatsAction.Response statsResponse =
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
|
||||
assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
|
||||
|
@ -98,9 +100,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
assertTrue(putJob(job).isAcknowledged());
|
||||
openJob(job.getId());
|
||||
assertBusy(() -> {
|
||||
GetJobsStatsAction.Response statsResponse =
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
|
||||
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
|
||||
assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
|
||||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
|
||||
|
|
|
@ -6,14 +6,19 @@
|
|||
package org.elasticsearch.xpack.ml.job.config;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
@ -22,6 +27,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class JobTests extends AbstractSerializingTestCase<Job> {
|
||||
|
||||
|
@ -345,18 +352,33 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
assertEquals(AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-carol", job.getResultsIndexName());
|
||||
}
|
||||
|
||||
public void testBuilder_withInvalidIndexNameThrows () {
|
||||
public void testBuilder_withInvalidIndexNameThrows() {
|
||||
Job.Builder builder = buildJobBuilder("foo");
|
||||
builder.setResultsIndexName("_bad^name");
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> builder.build());
|
||||
assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName(), "_bad^name"), e.getMessage());
|
||||
}
|
||||
|
||||
public void testBuilder_buildWithCreateTime () {
|
||||
public void testBuilder_buildWithCreateTime() {
|
||||
Job.Builder builder = buildJobBuilder("foo");
|
||||
Date now = new Date();
|
||||
Job job = builder.build(now);
|
||||
assertEquals(now, job.getCreateTime());
|
||||
assertEquals(Version.CURRENT, job.getJobVersion());
|
||||
}
|
||||
|
||||
public void testJobWithoutVersion() throws IOException {
|
||||
Job.Builder builder = buildJobBuilder("foo");
|
||||
Job job = builder.build();
|
||||
assertThat(job.getJobVersion(), is(nullValue()));
|
||||
|
||||
// Assert parsing a job without version works as expected
|
||||
XContentBuilder xContentBuilder = toXContent(job, randomFrom(XContentType.values()));
|
||||
try (XContentParser parser = XContentFactory.xContent(xContentBuilder.bytes())
|
||||
.createParser(NAMED_X_CONTENT_REGISTRY, xContentBuilder.bytes())) {
|
||||
Job parsed = Job.PARSER.apply(parser, null).build();
|
||||
assertThat(parsed, equalTo(job));
|
||||
}
|
||||
}
|
||||
|
||||
public void testBuilder_buildRequiresDataDescription() {
|
||||
|
@ -400,6 +422,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
if (randomBoolean()) {
|
||||
builder.setDescription(randomAlphaOfLength(10));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setJobVersion(Version.CURRENT);
|
||||
}
|
||||
builder.setCreateTime(new Date(randomNonNegativeLong()));
|
||||
if (randomBoolean()) {
|
||||
builder.setFinishedTime(new Date(randomNonNegativeLong()));
|
||||
|
|
Loading…
Reference in New Issue