[ML] Set job create time on server (elastic/x-pack-elasticsearch#910)

* [ML] Set job create time on server

* Job.Builder serialisation tests

* Make setCreateTime package private

Original commit: elastic/x-pack-elasticsearch@d2d75e0d7b
This commit is contained in:
David Kyle 2017-04-03 18:30:47 +01:00 committed by GitHub
parent 758b689f51
commit 5b66c7a7ba
31 changed files with 412 additions and 113 deletions

View File

@ -39,7 +39,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Response, PutJobAction.RequestBuilder> {
@ -72,22 +71,20 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
throw new IllegalArgumentException(Messages.getMessage(Messages.INCONSISTENT_ID, Job.ID.getPreferredName(),
job.getId(), jobId));
}
if (job.getCreateTime() == null) {
job.setCreateTime(new Date());
}
return new Request(job.build());
return new Request(job);
}
private Job job;
private Job.Builder job;
public Request(Job job) {
public Request(Job.Builder job) {
this.job = job;
}
Request() {
}
public Job getJob() {
public Job.Builder getJob() {
return job;
}
@ -99,7 +96,7 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
job = new Job(in);
job = new Job.Builder(in);
}
@Override

View File

@ -68,10 +68,8 @@ extends Action<ValidateJobConfigAction.Request, ValidateJobConfigAction.Response
// When jobs are PUT their ID must be supplied in the URL - assume this will
// be valid unless an invalid job ID is specified in the JSON to be validated
job.setId(job.getId() != null ? job.getId() : "ok");
if (job.getCreateTime() == null) {
job.setCreateTime(new Date());
}
return new Request(job.build());
return new Request(job.getCreateTime() == null ? job.build(new Date()) : job.build());
}
Request() {

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -163,7 +164,7 @@ public class JobManager extends AbstractComponent {
* Stores a job in the cluster state
*/
public void putJob(PutJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> actionListener) {
Job job = request.getJob();
Job job = request.getJob().build(new Date());
ActionListener<Boolean> createResultsIndexListener = ActionListener.wrap(jobSaved ->
jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() {

View File

@ -494,11 +494,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
}
public static class Builder {
public static class Builder implements Writeable, ToXContent {
private String id;
private String description;
private AnalysisConfig analysisConfig;
private AnalysisLimits analysisLimits;
private DataDescription dataDescription;
@ -542,6 +541,26 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
this.deleted = job.isDeleted();
}
public Builder(StreamInput in) throws IOException {
id = in.readOptionalString();
description = in.readOptionalString();
createTime = in.readBoolean() ? new Date(in.readVLong()) : null;
finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null;
lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null;
analysisConfig = in.readOptionalWriteable(AnalysisConfig::new);
analysisLimits = in.readOptionalWriteable(AnalysisLimits::new);
dataDescription = in.readOptionalWriteable(DataDescription::new);
modelPlotConfig = in.readOptionalWriteable(ModelPlotConfig::new);
renormalizationWindowDays = in.readOptionalLong();
backgroundPersistInterval = in.readOptionalWriteable(TimeValue::new);
modelSnapshotRetentionDays = in.readOptionalLong();
resultsRetentionDays = in.readOptionalLong();
customSettings = in.readMap();
modelSnapshotId = in.readOptionalString();
resultsIndexName = in.readOptionalString();
deleted = in.readBoolean();
}
public Builder setId(String id) {
this.id = id;
return this;
@ -584,7 +603,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return this;
}
public Builder setCreateTime(Date createTime) {
Builder setCreateTime(Date createTime) {
this.createTime = createTime;
return this;
}
@ -648,6 +667,138 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
return this;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(id);
out.writeOptionalString(description);
if (createTime != null) {
out.writeBoolean(true);
out.writeVLong(createTime.getTime());
} else {
out.writeBoolean(false);
}
if (finishedTime != null) {
out.writeBoolean(true);
out.writeVLong(finishedTime.getTime());
} else {
out.writeBoolean(false);
}
if (lastDataTime != null) {
out.writeBoolean(true);
out.writeVLong(lastDataTime.getTime());
} else {
out.writeBoolean(false);
}
out.writeOptionalWriteable(analysisConfig);
out.writeOptionalWriteable(analysisLimits);
out.writeOptionalWriteable(dataDescription);
out.writeOptionalWriteable(modelPlotConfig);
out.writeOptionalLong(renormalizationWindowDays);
out.writeOptionalWriteable(backgroundPersistInterval);
out.writeOptionalLong(modelSnapshotRetentionDays);
out.writeOptionalLong(resultsRetentionDays);
out.writeMap(customSettings);
out.writeOptionalString(modelSnapshotId);
out.writeOptionalString(resultsIndexName);
out.writeBoolean(deleted);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (id != null) {
builder.field(ID.getPreferredName(), id);
}
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
if (createTime != null) {
builder.field(CREATE_TIME.getPreferredName(), createTime.getTime());
}
if (finishedTime != null) {
builder.field(FINISHED_TIME.getPreferredName(), finishedTime.getTime());
}
if (lastDataTime != null) {
builder.field(LAST_DATA_TIME.getPreferredName(), lastDataTime.getTime());
}
if (analysisConfig != null) {
builder.field(ANALYSIS_CONFIG.getPreferredName(), analysisConfig, params);
}
if (analysisLimits != null) {
builder.field(ANALYSIS_LIMITS.getPreferredName(), analysisLimits, params);
}
if (dataDescription != null) {
builder.field(DATA_DESCRIPTION.getPreferredName(), dataDescription, params);
}
if (modelPlotConfig != null) {
builder.field(MODEL_PLOT_CONFIG.getPreferredName(), modelPlotConfig, params);
}
if (renormalizationWindowDays != null) {
builder.field(RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays);
}
if (backgroundPersistInterval != null) {
builder.field(BACKGROUND_PERSIST_INTERVAL.getPreferredName(), backgroundPersistInterval.getStringRep());
}
if (modelSnapshotRetentionDays != null) {
builder.field(MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
}
if (resultsRetentionDays != null) {
builder.field(RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays);
}
if (customSettings != null) {
builder.field(CUSTOM_SETTINGS.getPreferredName(), customSettings);
}
if (modelSnapshotId != null) {
builder.field(MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshotId);
}
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
if (params.paramAsBoolean("all", false)) {
builder.field(DELETED.getPreferredName(), deleted);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Job.Builder that = (Job.Builder) o;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.analysisConfig, that.analysisConfig)
&& Objects.equals(this.analysisLimits, that.analysisLimits)
&& Objects.equals(this.dataDescription, that.dataDescription)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.finishedTime, that.finishedTime)
&& Objects.equals(this.lastDataTime, that.lastDataTime)
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
&& Objects.equals(this.renormalizationWindowDays, that.renormalizationWindowDays)
&& Objects.equals(this.backgroundPersistInterval, that.backgroundPersistInterval)
&& Objects.equals(this.modelSnapshotRetentionDays, that.modelSnapshotRetentionDays)
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleted, that.deleted);
}
@Override
public int hashCode() {
return Objects.hash(id, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime,
lastDataTime, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays,
resultsRetentionDays, customSettings, modelSnapshotId, resultsIndexName, deleted);
}
public Job build(Date createTime) {
setCreateTime(createTime);
return build();
}
public Job build() {
Date createTime;
@ -665,9 +816,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
}
checkValidBackgroundPersistInterval();
checkValueNotLessThan(0, "renormalizationWindowDays", renormalizationWindowDays);
checkValueNotLessThan(0, "modelSnapshotRetentionDays", modelSnapshotRetentionDays);
checkValueNotLessThan(0, "resultsRetentionDays", resultsRetentionDays);
checkValueNotLessThan(0, RENORMALIZATION_WINDOW_DAYS.getPreferredName(), renormalizationWindowDays);
checkValueNotLessThan(0, MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName(), modelSnapshotRetentionDays);
checkValueNotLessThan(0, RESULTS_RETENTION_DAYS.getPreferredName(), resultsRetentionDays);
if (!MlStrings.isValidId(id)) {
throw new IllegalArgumentException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), id));

View File

@ -59,7 +59,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), listener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), listener);
listener.actionGet();
fail("put job action should not be enabled!");
} catch (ElasticsearchSecurityException e) {
@ -76,7 +76,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), listener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), listener);
PutJobAction.Response response = listener.actionGet();
assertNotNull(response);
}
@ -89,7 +89,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response response = putJobListener.actionGet();
assertNotNull(response);
}
@ -139,7 +139,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
}
@ -183,7 +183,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// put job
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
// put datafeed
@ -280,7 +280,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
PlainListenableActionFuture<PutDatafeedAction.Response> putDatafeedListener = new PlainListenableActionFuture<>(
@ -349,7 +349,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
PlainListenableActionFuture<PutDatafeedAction.Response> putDatafeedListener = new PlainListenableActionFuture<>(
@ -399,7 +399,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
PlainListenableActionFuture<OpenJobAction.Response> openJobListener = new PlainListenableActionFuture<>(client.threadPool());
@ -441,7 +441,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
}
@ -465,7 +465,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo")), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
PlainListenableActionFuture<PutDatafeedAction.Response> putDatafeedListener = new PlainListenableActionFuture<>(

View File

@ -246,9 +246,8 @@ public class MachineLearningFeatureSetTests extends ESTestCase {
private static Job buildJob(String jobId, List<Detector> detectors) {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
return new Job.Builder(jobId)
.setCreateTime(new Date(randomNonNegativeLong()))
.setAnalysisConfig(analysisConfig)
.build();
.build(new Date(randomNonNegativeLong()));
}
private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
@ -157,7 +158,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testRemoveJob_failDatafeedRefersToJob() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -177,7 +178,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testCrudDatafeed() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -202,7 +203,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testPutDatafeed_failBecauseDatafeedIdIsAlreadyTaken() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -212,7 +213,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testPutDatafeed_failBecauseJobAlreadyHasDatafeed() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
@ -226,18 +227,19 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testPutDatafeed_failBecauseJobIsNotCompatibleForDatafeed() {
Job.Builder job1 = createDatafeedJob();
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build().getAnalysisConfig());
Date now = new Date();
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job1.build(now).getAnalysisConfig());
analysisConfig.setLatency(TimeValue.timeValueHours(1));
job1.setAnalysisConfig(analysisConfig);
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1.build(), false);
builder.putJob(job1.build(now), false);
expectThrows(IllegalArgumentException.class, () -> builder.putDatafeed(datafeedConfig1));
}
public void testUpdateDatafeed() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -262,7 +264,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -284,7 +286,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -299,7 +301,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
Job.Builder job2 = new Job.Builder(job1);
job2.setId(job1.getId() + "_2");
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
@ -321,7 +323,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testRemoveDatafeed_failBecauseDatafeedStarted() {
Job job1 = createDatafeedJob().build();
Job job1 = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);

View File

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -53,7 +54,7 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
public void testValidate() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(), false);
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id",
Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>();

View File

@ -132,7 +132,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
Job.Builder job = createScheduledJob("lookback-job");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
@ -171,7 +171,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
indexDocs(logger, "data", numDocs1, lastWeek, now);
Job.Builder job = createScheduledJob("realtime-job");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));

View File

@ -39,6 +39,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -313,7 +314,7 @@ public class OpenJobActionTests extends ESTestCase {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (String jobId : jobIds) {
Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build();
Job job = BaseMlIntegTestCase.createFareQuoteJob(jobId).build(new Date());
mlMetadata.putJob(job, false);
}
metaData.putCustom(MlMetadata.TYPE, mlMetadata.build());

View File

@ -23,7 +23,7 @@ public class PutJobActionRequestTests extends AbstractStreamableXContentTestCase
@Override
protected Request createTestInstance() {
Job.Builder jobConfiguration = buildJobBuilder(jobId, date);
return new Request(jobConfiguration.build());
return new Request(jobConfiguration);
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import java.net.InetAddress;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -42,7 +43,7 @@ public class StartDatafeedActionTests extends ESTestCase {
public void testSelectNode() throws Exception {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createScheduledJob("job_id").build();
Job job = createScheduledJob("job_id").build(new Date());
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
@ -77,7 +78,7 @@ public class StartDatafeedActionTests extends ESTestCase {
public void testSelectNode_jobTaskStale() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createScheduledJob("job_id").build();
Job job = createScheduledJob("job_id").build(new Date());
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*")));
@ -111,7 +112,7 @@ public class StartDatafeedActionTests extends ESTestCase {
}
public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
@ -121,7 +122,7 @@ public class StartDatafeedActionTests extends ESTestCase {
}
public void testValidate_jobClosed() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build();
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)
.build();
@ -138,7 +139,7 @@ public class StartDatafeedActionTests extends ESTestCase {
}
public void testValidate_dataFeedAlreadyStarted() {
Job job1 = createScheduledJob("job_id").build();
Job job1 = createScheduledJob("job_id").build(new Date());
DatafeedConfig datafeedConfig = createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"));
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false)

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Collections;
import java.util.Date;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
@ -52,7 +53,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
Job job = createDatafeedJob().build();
Job job = createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
Exception e = expectThrows(ResourceNotFoundException.class,
() -> StopDatafeedAction.validateAndReturnNodeId("foo", mlMetadata1, tasks));
@ -75,7 +76,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap());
}
Job job = createDatafeedJob().build();
Job job = createDatafeedJob().build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("foo", "job_id").build();
MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job, false)

View File

@ -91,7 +91,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void setUpTests() {
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job job = createDatafeedJob().build();
Job job = createDatafeedJob().build(new Date());
mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
@ -228,7 +228,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
currentTime = 6000000;
Job.Builder jobBuilder = createDatafeedJob();
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "job_id").build();
Job job = jobBuilder.build();
Job job = jobBuilder.build(new Date());
MlMetadata mlMetadata = new MlMetadata.Builder()
.putJob(job, false)
.putDatafeed(datafeedConfig)
@ -305,7 +305,6 @@ public class DatafeedJobRunnerTests extends ESTestCase {
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
return builder;
}

View File

@ -29,7 +29,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
ac.setLatency(TimeValue.timeValueSeconds(3600));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
@ -44,7 +44,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
ac.setLatency(TimeValue.ZERO);
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
DatafeedJobValidator.validate(datafeedConfig, job);
@ -56,7 +56,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setBatchSpan(TimeValue.timeValueSeconds(1800));
ac.setBucketSpan(TimeValue.timeValueSeconds(100));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
DatafeedJobValidator.validate(datafeedConfig, job);
@ -70,7 +70,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setSummaryCountFieldName(null);
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
@ -87,7 +87,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setSummaryCountFieldName("");
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
@ -102,9 +102,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setSummaryCountFieldName("some_count");
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(900.0).build();
DatafeedJobValidator.validate(datafeedConfig, job);
}
@ -114,7 +113,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
ac.setSummaryCountFieldName("some_count");
ac.setBucketSpan(TimeValue.timeValueSeconds(1800));
builder.setAnalysisConfig(ac);
Job job = builder.build();
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800001.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
@ -123,9 +122,8 @@ public class DatafeedJobValidatorTests extends ESTestCase {
assertEquals("Aggregation interval [1800001ms] must be less than or equal to the bucket_span [1800000ms]", e.getMessage());
}
public static Job.Builder buildJobBuilder(String id) {
private static Job.Builder buildJobBuilder(String id) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(new Date());
AnalysisConfig.Builder ac = createAnalysisConfig();
builder.setAnalysisConfig(ac);
return builder;

View File

@ -19,6 +19,8 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.junit.Before;
import java.util.Date;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
@ -38,7 +40,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo").build();
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
@ -51,7 +54,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
@ -64,7 +68,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
}
@ -78,7 +83,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
}
@ -93,7 +99,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
AggregationBuilders.histogram("time").interval(300000)));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()));
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}

View File

@ -50,7 +50,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(4);
Job.Builder job = createJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
ensureGreen();
@ -86,7 +86,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(4);
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
@ -160,7 +160,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(3);
Job.Builder job = createJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
@ -240,7 +240,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
int numJobs = numMlNodes * 10;
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
@ -316,7 +316,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
ensureStableCluster(2);
Job.Builder job = createFareQuoteJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());

View File

@ -61,7 +61,7 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
private static final String DATA_INDEX = "delete-expired-data-test-data";
private static final String DATA_TYPE = "my_type";
private List<Job> jobs;
private List<Job.Builder> jobs;
@Override
protected Settings externalClusterClientSettings() {
@ -109,7 +109,7 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
@After
public void tearDownData() throws Exception {
client().admin().indices().prepareDelete(DATA_INDEX).get();
for (Job job : jobs) {
for (Job.Builder job : jobs) {
DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(job.getId() + "-feed");
client().execute(DeleteDatafeedAction.INSTANCE, deleteDatafeedRequest).get();
DeleteJobAction.Request deleteJobRequest = new DeleteJobAction.Request(job.getId());
@ -118,15 +118,15 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
}
public void testDeleteExpiredData() throws Exception {
jobs.add(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(null).build());
jobs.add(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(null).build());
jobs.add(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).build());
jobs.add(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L).build());
jobs.add(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L).build());
jobs.add(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(null));
jobs.add(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(null));
jobs.add(newJobBuilder("snapshots-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
jobs.add(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
jobs.add(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L));
long now = System.currentTimeMillis();
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;
for (Job job : jobs) {
for (Job.Builder job : jobs) {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
@ -160,7 +160,7 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
// We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds)
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
for (Job job : jobs) {
for (Job.Builder job : jobs) {
// Run up to now
openJob(job.getId());
startDatafeed(job.getId() + "-feed", 0, now);
@ -230,7 +230,6 @@ public class DeleteExpiredDataIT extends SecurityIntegTestCase {
Job.Builder jobBuilder = new Job.Builder(id);
jobBuilder.setAnalysisConfig(analysisConfig);
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
return jobBuilder;
}

View File

@ -106,7 +106,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
indexDocs(logger, "data", numDocs1, twoWeeksAgo, weekAgo);
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());

View File

@ -26,7 +26,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
// create and open first job, which succeeds:
Job.Builder job = createJob("1");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
@ -38,7 +38,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
// create and try to open second job, which fails:
job = createJob("2");
putJobRequest = new PutJobAction.Request(job.build());
putJobRequest = new PutJobAction.Request(job);
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
expectThrows(ElasticsearchStatusException.class,
@ -71,7 +71,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode;
for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) {
Job.Builder job = createJob(Integer.toString(i));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());

View File

@ -6,7 +6,9 @@
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -15,15 +17,27 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import java.util.Collections;
import java.util.Date;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
public class JobManagerTests extends ESTestCase {
@ -103,6 +117,53 @@ public class JobManagerTests extends ESTestCase {
assertThat(result.results().get(9).getId(), equalTo("9"));
}
@SuppressWarnings("unchecked")
public void testPutJob_AddsCreateTime() {
JobManager jobManager = createJobManager();
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
doAnswer(invocation -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocation.getArguments()[1];
task.onAllNodesAcked(null);
return null;
}).when(clusterService).submitStateUpdateTask(Matchers.eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
ArgumentCaptor<Job> requestCaptor = ArgumentCaptor.forClass(Job.class);
doAnswer(invocation -> {
ActionListener<Boolean> listener = (ActionListener<Boolean>) invocation.getArguments()[2];
listener.onResponse(true);
return null;
}).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class));
jobManager.putJob(putJobRequest, mock(ClusterState.class), new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
Job job = requestCaptor.getValue();
assertNotNull(job.getCreateTime());
Date now = new Date();
// job create time should be within the last second
assertThat(now.getTime(), greaterThanOrEqualTo(job.getCreateTime().getTime()));
assertThat(now.getTime() - 1000, lessThanOrEqualTo(job.getCreateTime().getTime()));
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
}
private Job.Builder createJob() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build()));
Job.Builder builder = new Job.Builder();
builder.setId("foo");
builder.setAnalysisConfig(ac);
return builder;
}
private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.config;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.Collections;
import java.util.Date;
import static org.elasticsearch.xpack.ml.job.config.JobTests.randomValidJobId;
public class JobBuilderTests extends AbstractSerializingTestCase<Job.Builder> {
@Override
protected Job.Builder createTestInstance() {
Job.Builder builder = new Job.Builder();
if (randomBoolean()) {
builder.setId(randomValidJobId());
}
if (randomBoolean()) {
builder.setDescription(randomAsciiOfLength(10));
}
if (randomBoolean()) {
builder.setCreateTime(new Date(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setFinishedTime(new Date(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setLastDataTime(new Date(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setAnalysisConfig(AnalysisConfigTests.createRandomized());
}
if (randomBoolean()) {
builder.setAnalysisLimits(new AnalysisLimits(randomNonNegativeLong(),
randomNonNegativeLong()));
}
if (randomBoolean()) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(randomFrom(DataDescription.DataFormat.values()));
builder.setDataDescription(dataDescription);
}
if (randomBoolean()) {
builder.setModelPlotConfig(new ModelPlotConfig(randomBoolean(),
randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setRenormalizationWindowDays(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setBackgroundPersistInterval(TimeValue.timeValueHours(randomIntBetween(1, 24)));
}
if (randomBoolean()) {
builder.setModelSnapshotRetentionDays(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setResultsRetentionDays(randomNonNegativeLong());
}
if (randomBoolean()) {
builder.setCustomSettings(Collections.singletonMap(randomAsciiOfLength(10),
randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setModelSnapshotId(randomAsciiOfLength(10));
}
if (randomBoolean()) {
builder.setResultsIndexName(randomValidJobId());
}
return builder;
}
@Override
protected Writeable.Reader<Job.Builder> instanceReader() {
return Job.Builder::new;
}
@Override
protected Job.Builder parseInstance(XContentParser parser) {
return Job.PARSER.apply(parser, null);
}
}

View File

@ -291,7 +291,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
public void testVerify_GivenNegativeRenormalizationWindowDays() {
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW,
"renormalizationWindowDays", 0, -1);
"renormalization_window_days", 0, -1);
Job.Builder builder = buildJobBuilder("foo");
builder.setRenormalizationWindowDays(-1L);
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, builder::build);
@ -299,7 +299,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
public void testVerify_GivenNegativeModelSnapshotRetentionDays() {
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "modelSnapshotRetentionDays", 0, -1);
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW, "model_snapshot_retention_days", 0, -1);
Job.Builder builder = buildJobBuilder("foo");
builder.setModelSnapshotRetentionDays(-1L);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build);
@ -317,7 +317,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
public void testVerify_GivenNegativeResultsRetentionDays() {
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_FIELD_VALUE_TOO_LOW,
"resultsRetentionDays", 0, -1);
"results_retention_days", 0, -1);
Job.Builder builder = buildJobBuilder("foo");
builder.setResultsRetentionDays(-1L);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::build);
@ -344,6 +344,13 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertEquals(Messages.getMessage(Messages.INVALID_ID, Job.RESULTS_INDEX_NAME.getPreferredName(), "_bad^name"), e.getMessage());
}
public void testBuilder_buildWithCreateTime () {
Job.Builder builder = buildJobBuilder("foo");
Date now = new Date();
Job job = builder.build(now);
assertEquals(now, job.getCreateTime());
}
public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date);

View File

@ -54,8 +54,7 @@ public class DataCountsReporterTests extends ESTestCase {
Job.Builder builder = new Job.Builder("sr");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
job = builder.build();
job = builder.build(new Date());
jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class);
threadPool = Mockito.mock(ThreadPool.class);

View File

@ -29,8 +29,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
job = builder.build();
job = builder.build(new Date());
}
public void testSimple() {

View File

@ -64,7 +64,6 @@ class DummyDataCountsReporter extends DataCountsReporter {
Job.Builder builder = new Job.Builder("dummy_job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
return builder.build();
return builder.build(new Date());
}
}

View File

@ -110,8 +110,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
builder.setDataDescription(dd);
builder.setAnalysisConfig(ac);
builder.setCreateTime(new Date());
return builder.build();
return builder.build(new Date());
}
private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException {

View File

@ -385,9 +385,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Job.Builder builder = new Job.Builder(jobId);
builder.setDataDescription(dd);
builder.setAnalysisConfig(ac);
builder.setCreateTime(new Date());
return builder.build();
return builder.build(new Date());
}
private static InputStream createInputStream(String input) {

View File

@ -76,9 +76,8 @@ public class ScoresUpdaterTests extends ESTestCase {
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(detectors);
configBuilder.setBucketSpan(TimeValue.timeValueSeconds(DEFAULT_BUCKET_SPAN));
jobBuilder.setAnalysisConfig(configBuilder);
jobBuilder.setCreateTime(new Date());
job = jobBuilder.build();
job = jobBuilder.build(new Date());
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);

View File

@ -124,7 +124,6 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setCreateTime(new Date());
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
@ -144,8 +143,6 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setCreateTime(new Date());
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
@ -164,7 +161,6 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
builder.setCreateTime(new Date());
return builder;
}

View File

@ -30,7 +30,6 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
MachineLearningClient mlClient = xPackClient.machineLearning();
Job.Builder job = new Job.Builder();
job.setId("test");
job.setCreateTime(new Date());
List<Detector> detectors = new ArrayList<>();
Detector.Builder detector = new Detector.Builder();
@ -42,7 +41,7 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
job.setAnalysisConfig(analysisConfig);
PutJobAction.Response putJobResponse = mlClient
.putJob(new PutJobAction.Request(job.build()))
.putJob(new PutJobAction.Request(job))
.actionGet();
assertThat(putJobResponse, notNullValue());