[ML] allow forecast only on post 6.1 jobs (elastic/x-pack-elasticsearch#3362)
allow forecast only on post 6.1 jobs discuss issue: elastic/machine-learning-cpp#494 relates elastic/x-pack-elasticsearch#3219 Original commit: elastic/x-pack-elasticsearch@c6884bc40f
This commit is contained in:
parent
1cc73a0307
commit
bf74c77fef
|
@ -5,12 +5,14 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.action;
|
package org.elasticsearch.xpack.ml.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.Action;
|
import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestBuilder;
|
import org.elasticsearch.action.ActionRequestBuilder;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
||||||
import org.elasticsearch.client.ElasticsearchClient;
|
import org.elasticsearch.client.ElasticsearchClient;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
|
@ -31,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
|
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
|
||||||
import org.elasticsearch.xpack.ml.job.results.Forecast;
|
import org.elasticsearch.xpack.ml.job.results.Forecast;
|
||||||
|
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -247,16 +250,13 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
|
||||||
|
|
||||||
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
|
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
|
||||||
|
|
||||||
private final JobManager jobManager;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
|
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
JobManager jobManager, AutodetectProcessManager processManager) {
|
AutodetectProcessManager processManager) {
|
||||||
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||||
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
|
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
|
||||||
processManager);
|
processManager);
|
||||||
this.jobManager = jobManager;
|
|
||||||
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
|
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,20 +269,16 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
|
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
|
||||||
|
ClusterState state = clusterService.state();
|
||||||
|
Job job = JobManager.getJobOrThrowIfUnknown(task.getJobId(), state);
|
||||||
|
validate(job, request);
|
||||||
|
|
||||||
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
|
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
|
||||||
|
|
||||||
if (request.getDuration() != null) {
|
if (request.getDuration() != null) {
|
||||||
TimeValue duration = request.getDuration();
|
|
||||||
TimeValue bucketSpan = jobManager.getJobOrThrowIfUnknown(task.getJobId()).getAnalysisConfig().getBucketSpan();
|
|
||||||
|
|
||||||
if (duration.compareTo(bucketSpan) < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"[" + DURATION.getPreferredName()
|
|
||||||
+ "] must be greater or equal to the bucket span: ["
|
|
||||||
+ duration.getStringRep() + "/" + bucketSpan.getStringRep() + "]");
|
|
||||||
}
|
|
||||||
|
|
||||||
paramsBuilder.duration(request.getDuration());
|
paramsBuilder.duration(request.getDuration());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request.getExpiresIn() != null) {
|
if (request.getExpiresIn() != null) {
|
||||||
paramsBuilder.expiresIn(request.getExpiresIn());
|
paramsBuilder.expiresIn(request.getExpiresIn());
|
||||||
}
|
}
|
||||||
|
@ -296,6 +292,24 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void validate(Job job, Request request) {
|
||||||
|
if (job.getJobVersion() == null || job.getJobVersion().before(Version.V_6_1_0)) {
|
||||||
|
throw ExceptionsHelper.badRequestException(
|
||||||
|
"Cannot run forecast because jobs created prior to version 6.1 are not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (request.getDuration() != null) {
|
||||||
|
TimeValue duration = request.getDuration();
|
||||||
|
TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
|
||||||
|
|
||||||
|
if (duration.compareTo(bucketSpan) < 0) {
|
||||||
|
throw ExceptionsHelper.badRequestException(
|
||||||
|
"[" + DURATION.getPreferredName() + "] must be greater or equal to the bucket span: [" + duration.getStringRep()
|
||||||
|
+ "/" + bucketSpan.getStringRep() + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -757,7 +757,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
Builder setCreateTime(Date createTime) {
|
public Builder setCreateTime(Date createTime) {
|
||||||
this.createTime = createTime;
|
this.createTime = createTime;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,10 +5,19 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.action;
|
package org.elasticsearch.xpack.ml.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
|
||||||
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request;
|
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
@ -61,4 +70,51 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes
|
||||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new Request().setExpiresIn("-1s"));
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new Request().setExpiresIn("-1s"));
|
||||||
assertThat(e.getMessage(), equalTo("[expires_in] must be non-negative: [-1]"));
|
assertThat(e.getMessage(), equalTo("[expires_in] must be non-negative: [-1]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testValidate_jobVersionCannonBeBefore61() {
|
||||||
|
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
|
||||||
|
|
||||||
|
jobBuilder.setJobVersion(Version.V_6_0_1);
|
||||||
|
ForecastJobAction.Request request = new ForecastJobAction.Request();
|
||||||
|
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||||
|
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request));
|
||||||
|
assertEquals(
|
||||||
|
"Cannot run forecast because jobs created prior to version 6.1 are not supported",
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidate_jobVersionCannonBeBefore61NoJobVersion() {
|
||||||
|
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
|
||||||
|
|
||||||
|
ForecastJobAction.Request request = new ForecastJobAction.Request();
|
||||||
|
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||||
|
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(), request));
|
||||||
|
assertEquals(
|
||||||
|
"Cannot run forecast because jobs created prior to version 6.1 are not supported",
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testValidate_DurationCannotBeLessThanBucketSpan() {
|
||||||
|
Job.Builder jobBuilder = createTestJob("forecast-it-test-job-version");
|
||||||
|
|
||||||
|
ForecastJobAction.Request request = new ForecastJobAction.Request();
|
||||||
|
request.setDuration(TimeValue.timeValueMinutes(1));
|
||||||
|
Exception e = expectThrows(ElasticsearchStatusException.class,
|
||||||
|
() -> ForecastJobAction.TransportAction.validate(jobBuilder.build(new Date()), request));
|
||||||
|
assertEquals("[duration] must be greater or equal to the bucket span: [1m/1h]", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Job.Builder createTestJob(String jobId) {
|
||||||
|
Job.Builder jobBuilder = new Job.Builder(jobId);
|
||||||
|
jobBuilder.setCreateTime(new Date());
|
||||||
|
Detector.Builder detector = new Detector.Builder("mean", "value");
|
||||||
|
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
|
||||||
|
TimeValue bucketSpan = TimeValue.timeValueHours(1);
|
||||||
|
analysisConfig.setBucketSpan(bucketSpan);
|
||||||
|
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||||
|
dataDescription.setTimeFormat("epoch");
|
||||||
|
jobBuilder.setAnalysisConfig(analysisConfig);
|
||||||
|
jobBuilder.setDataDescription(dataDescription);
|
||||||
|
return jobBuilder;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
|
||||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,() -> forecast(job.getId(),
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,() -> forecast(job.getId(),
|
||||||
TimeValue.timeValueMinutes(10), null));
|
TimeValue.timeValueMinutes(10), null));
|
||||||
assertThat(e.getMessage(),
|
assertThat(e.getMessage(),
|
||||||
equalTo("java.lang.IllegalArgumentException: [duration] must be greater or equal to the bucket span: [10m/1h]"));
|
equalTo("[duration] must be greater or equal to the bucket span: [10m/1h]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, Object> createRecord(long timestamp, double value) {
|
private static Map<String, Object> createRecord(long timestamp, double value) {
|
||||||
|
|
Loading…
Reference in New Issue