From bf74c77fef6a5bea3489fd0e340c4f5ff6d83a3c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 21 Dec 2017 07:47:32 +0100 Subject: [PATCH] [ML] allow forecast only on post 6.1 jobs (elastic/x-pack-elasticsearch#3362) allow forecast only on post 6.1 jobs discuss issue: elastic/machine-learning-cpp#494 relates elastic/x-pack-elasticsearch#3219 Original commit: elastic/x-pack-elasticsearch@c6884bc40ffc3bed98d747fe52a8906c75360718 --- .../xpack/ml/action/ForecastJobAction.java | 42 +++++++++----- .../xpack/ml/job/config/Job.java | 2 +- .../action/ForecastJobActionRequestTests.java | 56 +++++++++++++++++++ .../xpack/ml/integration/ForecastIT.java | 2 +- 4 files changed, 86 insertions(+), 16 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java index e3dc4e94466..7785e146420 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -5,12 +5,14 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -31,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.results.Forecast; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; @@ -247,16 +250,13 @@ public class ForecastJobAction extends Action { - private final JobManager jobManager; - @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, AutodetectProcessManager processManager) { + AutodetectProcessManager processManager) { super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME, processManager); - this.jobManager = jobManager; // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread } @@ -269,20 +269,16 @@ public class ForecastJobAction extends Action listener) { + ClusterState state = clusterService.state(); + Job job = JobManager.getJobOrThrowIfUnknown(task.getJobId(), state); + validate(job, request); + ForecastParams.Builder paramsBuilder = ForecastParams.builder(); + if (request.getDuration() != null) { - TimeValue duration = request.getDuration(); - TimeValue bucketSpan = jobManager.getJobOrThrowIfUnknown(task.getJobId()).getAnalysisConfig().getBucketSpan(); - - if (duration.compareTo(bucketSpan) < 0) { - throw new IllegalArgumentException( - "[" + DURATION.getPreferredName() - + "] must be greater or equal to the bucket span: [" - + duration.getStringRep() + "/" + bucketSpan.getStringRep() + "]"); - } - paramsBuilder.duration(request.getDuration()); } + if (request.getExpiresIn() != null) { paramsBuilder.expiresIn(request.getExpiresIn()); } @@ -296,6 +292,24 @@ public class ForecastJobAction extends Action implements Writeable, ToXContentO return this; } - Builder setCreateTime(Date createTime) { + public Builder setCreateTime(Date createTime) { this.createTime = createTime; return this; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java index ccf10941fbb..a59502a76ae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java @@ -5,10 +5,19 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.util.Collections; +import java.util.Date; import static org.hamcrest.Matchers.equalTo; @@ -61,4 +70,51 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new Request().setExpiresIn("-1s")); assertThat(e.getMessage(), equalTo("[expires_in] must be non-negative: [-1]")); } + + public void testValidate_jobVersionCannonBeBefore61() { + Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version"); + + jobBuilder.setJobVersion(Version.V_6_0_1); + ForecastJobAction.Request request = new ForecastJobAction.Request(); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request)); + assertEquals( + "Cannot run forecast because jobs created prior to version 6.1 are not supported", + e.getMessage()); + } + + public void testValidate_jobVersionCannonBeBefore61NoJobVersion() { + Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version"); + + ForecastJobAction.Request request = new ForecastJobAction.Request(); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request)); + assertEquals( + "Cannot run forecast because jobs created prior to version 6.1 are not supported", + e.getMessage()); + } + + public void testValidate_DurationCannotBeLessThanBucketSpan() { + Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version"); + + ForecastJobAction.Request request = new ForecastJobAction.Request(); + request.setDuration(TimeValue.timeValueMinutes(1)); + Exception e = expectThrows(ElasticsearchStatusException.class, + () -> ForecastJobAction.TransportAction.validate(jobBuilder.build(new Date()), request)); + assertEquals("[duration] must be greater or equal to the bucket span: [1m/1h]", e.getMessage()); + } + + private Job.Builder createTestJob(String jobId) { + Job.Builder jobBuilder = new Job.Builder(jobId); + jobBuilder.setCreateTime(new Date()); + Detector.Builder detector = new Detector.Builder("mean", "value"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + TimeValue bucketSpan = TimeValue.timeValueHours(1); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + jobBuilder.setAnalysisConfig(analysisConfig); + jobBuilder.setDataDescription(dataDescription); + return jobBuilder; + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java index b411c6d8f67..15cbb6a5fdd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java @@ -153,7 +153,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { ElasticsearchException e = expectThrows(ElasticsearchException.class,() -> forecast(job.getId(), TimeValue.timeValueMinutes(10), null)); assertThat(e.getMessage(), - equalTo("java.lang.IllegalArgumentException: [duration] must be greater or equal to the bucket span: [10m/1h]")); + equalTo("[duration] must be greater or equal to the bucket span: [10m/1h]")); } private static Map createRecord(long timestamp, double value) {