[ML-FC] do not allow durations below the bucket span (elastic/x-pack-elasticsearch#3166)
do not allow durations below the bucket span Original commit: elastic/x-pack-elasticsearch@0e895c1ddd
This commit is contained in:
parent
c2ff796fea
commit
756d878983
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.ml.job.JobManager;
|
||||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
|
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
|
||||||
|
@ -34,6 +35,8 @@ import org.elasticsearch.xpack.ml.job.results.Forecast;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.ml.action.ForecastJobAction.Request.DURATION;
|
||||||
|
|
||||||
public class ForecastJobAction extends Action<ForecastJobAction.Request, ForecastJobAction.Response, ForecastJobAction.RequestBuilder> {
|
public class ForecastJobAction extends Action<ForecastJobAction.Request, ForecastJobAction.Response, ForecastJobAction.RequestBuilder> {
|
||||||
|
|
||||||
public static final ForecastJobAction INSTANCE = new ForecastJobAction();
|
public static final ForecastJobAction INSTANCE = new ForecastJobAction();
|
||||||
|
@ -244,13 +247,16 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
|
||||||
|
|
||||||
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
|
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
|
||||||
|
|
||||||
|
private final JobManager jobManager;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
|
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
AutodetectProcessManager processManager) {
|
JobManager jobManager, AutodetectProcessManager processManager) {
|
||||||
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
|
||||||
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
|
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
|
||||||
processManager);
|
processManager);
|
||||||
|
this.jobManager = jobManager;
|
||||||
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
|
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +271,16 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
|
||||||
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
|
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
|
||||||
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
|
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
|
||||||
if (request.getDuration() != null) {
|
if (request.getDuration() != null) {
|
||||||
|
TimeValue duration = request.getDuration();
|
||||||
|
TimeValue bucketSpan = jobManager.getJobOrThrowIfUnknown(task.getJobId()).getAnalysisConfig().getBucketSpan();
|
||||||
|
|
||||||
|
if (duration.compareTo(bucketSpan) < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"[" + DURATION.getPreferredName()
|
||||||
|
+ "] must be greater or equal to the bucket span: ["
|
||||||
|
+ duration.getStringRep() + "/" + bucketSpan.getStringRep() + "]");
|
||||||
|
}
|
||||||
|
|
||||||
paramsBuilder.duration(request.getDuration());
|
paramsBuilder.duration(request.getDuration());
|
||||||
}
|
}
|
||||||
if (request.getExpiresIn() != null) {
|
if (request.getExpiresIn() != null) {
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.integration;
|
package org.elasticsearch.xpack.ml.integration;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||||
|
@ -134,6 +135,27 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDurationCannotBeLessThanBucketSpan() throws Exception {
|
||||||
|
Detector.Builder detector = new Detector.Builder("mean", "value");
|
||||||
|
|
||||||
|
TimeValue bucketSpan = TimeValue.timeValueHours(1);
|
||||||
|
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
|
||||||
|
analysisConfig.setBucketSpan(bucketSpan);
|
||||||
|
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||||
|
dataDescription.setTimeFormat("epoch");
|
||||||
|
Job.Builder job = new Job.Builder("forecast-it-test-duration-bucket-span");
|
||||||
|
job.setAnalysisConfig(analysisConfig);
|
||||||
|
job.setDataDescription(dataDescription);
|
||||||
|
|
||||||
|
registerJob(job);
|
||||||
|
putJob(job);
|
||||||
|
openJob(job.getId());
|
||||||
|
ElasticsearchException e = expectThrows(ElasticsearchException.class,() -> forecast(job.getId(),
|
||||||
|
TimeValue.timeValueMinutes(10), null));
|
||||||
|
assertThat(e.getMessage(),
|
||||||
|
equalTo("java.lang.IllegalArgumentException: [duration] must be greater or equal to the bucket span: [10m/1h]"));
|
||||||
|
}
|
||||||
|
|
||||||
private static Map<String, Object> createRecord(long timestamp, double value) {
|
private static Map<String, Object> createRecord(long timestamp, double value) {
|
||||||
Map<String, Object> record = new HashMap<>();
|
Map<String, Object> record = new HashMap<>();
|
||||||
record.put("time", timestamp);
|
record.put("time", timestamp);
|
||||||
|
|
Loading…
Reference in New Issue