From 7d19264363583ed66b7bc1e1371faac015a9a8b3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 14 Sep 2017 12:31:20 +0200 Subject: [PATCH] [ML-FC] Branch landing feature/ml (elastic/x-pack-elasticsearch#2500) integrate forecasting feature branch into master - add endpoint xpack/ml/job/forecast to request forecasting on data of ml-jobs - current parameters: end time - persists forecast results into shared or own index - different runs are separated by a 'forecast id' relates elastic/x-pack-elasticsearch#1838 Original commit: elastic/x-pack-elasticsearch@f9d701a6bc0ae56775e96cd39a2cdf2f8c48ccd5 --- .../xpack/ml/MachineLearning.java | 8 +- .../xpack/ml/action/ForecastJobAction.java | 234 +++++++++++++ .../persistence/ElasticsearchMappings.java | 20 ++ .../job/persistence/JobResultsPersister.java | 7 + .../autodetect/AutodetectCommunicator.java | 8 + .../process/autodetect/AutodetectProcess.java | 9 + .../autodetect/AutodetectProcessManager.java | 29 ++ .../BlackHoleAutodetectProcess.java | 9 +- .../autodetect/NativeAutodetectProcess.java | 11 +- .../output/AutoDetectResultProcessor.java | 16 + .../autodetect/params/ForecastParams.java | 102 ++++++ .../writer/ControlMsgToProcessWriter.java | 31 +- .../ml/job/results/AutodetectResult.java | 50 ++- .../xpack/ml/job/results/Forecast.java | 308 ++++++++++++++++++ .../xpack/ml/job/results/ForecastStats.java | 114 +++++++ .../ml/job/results/ReservedFieldNames.java | 4 + .../ml/rest/job/RestForecastJobAction.java | 50 +++ .../action/ForecastJobActionRequestTests.java | 35 ++ .../ForecastJobActionResponseTests.java | 23 ++ .../AutodetectResultProcessorIT.java | 18 +- .../params/ForecastParamsTests.java | 44 +++ .../ml/job/results/AutodetectResultTests.java | 15 +- .../ml/job/results/ForecastStatsTests.java | 46 +++ .../xpack/ml/job/results/ForecastTests.java | 68 ++++ .../org/elasticsearch/transport/actions | 1 + .../rest-api-spec/api/xpack.ml.forecast.json | 24 ++ .../rest-api-spec/test/ml/forecast.yml | 41 +++ 27 files changed, 1302 insertions(+), 23 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Forecast.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastStats.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastStatsTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastTests.java create mode 100644 plugin/src/test/resources/rest-api-spec/api/xpack.ml.forecast.json create mode 100644 plugin/src/test/resources/rest-api-spec/test/ml/forecast.yml diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 68701a5a5d1..57bde0eb70c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.ml.action.FlushJobAction; +import org.elasticsearch.xpack.ml.action.ForecastJobAction; import org.elasticsearch.xpack.ml.action.GetBucketsAction; import org.elasticsearch.xpack.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsAction; @@ -108,6 +109,7 @@ import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction; import org.elasticsearch.xpack.ml.rest.job.RestCloseJobAction; import org.elasticsearch.xpack.ml.rest.job.RestDeleteJobAction; import org.elasticsearch.xpack.ml.rest.job.RestFlushJobAction; +import org.elasticsearch.xpack.ml.rest.job.RestForecastJobAction; import org.elasticsearch.xpack.ml.rest.job.RestGetJobStatsAction; import org.elasticsearch.xpack.ml.rest.job.RestGetJobsAction; import org.elasticsearch.xpack.ml.rest.job.RestOpenJobAction; @@ -383,7 +385,8 @@ public class MachineLearning implements ActionPlugin { new RestStartDatafeedAction(settings, restController), new RestStopDatafeedAction(settings, restController), new RestDeleteModelSnapshotAction(settings, restController), - new RestDeleteExpiredDataAction(settings, restController) + new RestDeleteExpiredDataAction(settings, restController), + new RestForecastJobAction(settings, restController) ); } @@ -431,7 +434,8 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class), - new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class) + new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class), + new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class) ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java new file mode 100644 index 00000000000..6d6673b783d --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.xpack.ml.action.ForecastJobAction.Request.END_TIME; + +public class ForecastJobAction extends Action { + + public static final ForecastJobAction INSTANCE = new ForecastJobAction(); + public static final String NAME = "cluster:admin/xpack/ml/job/forecast"; + + private ForecastJobAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends TransportJobTaskAction.JobTaskRequest implements ToXContentObject { + + public static final ParseField END_TIME = new ParseField("end"); + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID); + PARSER.declareString(Request::setEndTime, END_TIME); + } + + public static Request parseRequest(String jobId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (jobId != null) { + request.jobId = jobId; + } + return request; + } + + private String endTime; + + Request() { + } + + public Request(String jobId) { + super(jobId); + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + this.endTime = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(endTime); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, endTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + if (endTime != null) { + builder.field(END_TIME.getPreferredName(), endTime); + } + builder.endObject(); + return builder; + } + } + + static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client, ForecastJobAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + + private boolean acknowledged; + private long id; + + Response() { + super(null, null); + } + + Response(boolean acknowledged, long id) { + super(null, null); + this.acknowledged = acknowledged; + this.id = id; + } + + public boolean isacknowledged() { + return acknowledged; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + acknowledged = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("acknowledged", acknowledged); + builder.field("id", id); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Response response = (Response) o; + return acknowledged == response.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged); + } + } + + public static class TransportAction extends TransportJobTaskAction { + + @Inject + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + AutodetectProcessManager processManager) { + super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME, + processManager); + // ThreadPool.Names.SAME, because operations is executed by + // autodetect worker thread + } + + @Override + protected ForecastJobAction.Response readTaskResponse(StreamInput in) throws IOException { + Response response = new Response(); + response.readFrom(in); + return response; + } + + @Override + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { + ForecastParams.Builder paramsBuilder = ForecastParams.builder(); + if (request.getEndTime() != null) { + paramsBuilder.endTime(request.getEndTime(), END_TIME); + } + + ForecastParams params = paramsBuilder.build(); + processManager.forecastJob(task, params, e -> { + if (e == null) { + listener.onResponse(new Response(true, params.getId())); + } else { + listener.onFailure(e); + } + }); + } + } +} + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 517b7f77393..d538c24f1ec 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.job.results.Influence; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; @@ -289,6 +290,7 @@ public class ElasticsearchMappings { .field(TYPE, DOUBLE) .endObject(); + addForecastFieldsToMapping(builder); addAnomalyRecordFieldsToMapping(builder); addInfluencerFieldsToMapping(builder); addModelSizeStatsFieldsToMapping(builder); @@ -320,6 +322,24 @@ public class ElasticsearchMappings { } } + private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException { + + // Forecast Output + builder.startObject(Forecast.FORECAST_LOWER.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(Forecast.FORECAST_UPPER.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(Forecast.FORECAST_PREDICTION.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(Forecast.FORECAST_ID.getPreferredName()) + .field(TYPE, LONG) + .endObject(); + } + + /** * AnomalyRecord fields to be added under the 'properties' section of the mapping * @param builder Add properties to this builder diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index e059c2debf4..06c14dc8674 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; @@ -151,6 +152,12 @@ public class JobResultsPersister extends AbstractComponent { return this; } + public Builder persistForecast(Forecast forecast) { + logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId()); + indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE); + return this; + } + private void indexResult(String id, ToXContent resultDoc, String resultType) { try (XContentBuilder content = toXContentBuilder(resultDoc)) { bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 5875b5249a2..0dcc25c8825 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResult import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -205,6 +206,13 @@ public class AutodetectCommunicator implements Closeable { }, handler); } + public void forecastJob(ForecastParams params, BiConsumer handler) { + submitOperation(() -> { + autodetectProcess.forecastJob(params); + return null; + }, handler); + } + @Nullable FlushAcknowledgement waitFlushToCompletion(String flushId) { LOGGER.debug("[{}] waiting for flush", job.getId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index c8ab913c0b8..7d4078a035f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -85,6 +86,14 @@ public interface AutodetectProcess extends Closeable { */ String flushJob(FlushJobParams params) throws IOException; + /** + * Do a forecast on a running job. + * + * @param params The forecast parameters + * @throws IOException If the write fails + */ + void forecastJob(ForecastParams params) throws IOException; + /** * Flush the output data stream */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 1a82777f214..996d0a76586 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledge import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; @@ -49,6 +50,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; +import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -240,6 +242,33 @@ public class AutodetectProcessManager extends AbstractComponent { }); } + /** + * Do a forecast for the running job. + * + * @param jobTask The job task + * @param params Forecast parameters + */ + public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { + logger.debug("Forecasting job {}", jobTask.getJobId()); + AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + if (communicator == null) { + String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId()); + logger.debug(message); + handler.accept(ExceptionsHelper.conflictStatusException(message)); + return; + } + + communicator.forecastJob(params, (aVoid, e) -> { + if (e == null) { + handler.accept(null); + } else { + String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobTask.getJobId()); + logger.error(msg, e); + handler.accept(ExceptionsHelper.serverError(msg, e)); + } + }); + } + public void writeUpdateProcessMessage(JobTask jobTask, List updates, ModelPlotConfig config, Consumer handler) { AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index a9d091a5b82..5eba87765f6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; @@ -78,7 +79,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { @Override public String flushJob(FlushJobParams params) throws IOException { FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null); - AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement); + AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement); results.add(result); return FLUSH_ID; } @@ -91,7 +92,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { public void close() throws IOException { if (open) { Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles"); - AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null); + AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null); results.add(result); open = false; } @@ -147,4 +148,8 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { public String readError() { return ""; } + + @Override + public void forecastJob(ForecastParams params) throws IOException { + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 3c2bf1c5589..9cee3db760a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResult import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; @@ -94,7 +95,9 @@ class NativeAutodetectProcess implements AutodetectProcess { if (processCloseInitiated == false && processKilled == false) { // The log message doesn't say "crashed", as the process could have been killed // by a user or other process (e.g. the Linux OOM killer) - LOGGER.error("[{}] autodetect process stopped unexpectedly", jobId); + + String errors = cppLogHandler.getErrors(); + LOGGER.error("[{}] autodetect process stopped unexpectedly: {}", jobId, errors); onProcessCrash.run(); } } @@ -163,6 +166,12 @@ class NativeAutodetectProcess implements AutodetectProcess { return writer.writeFlushMessage(); } + @Override + public void forecastJob(ForecastParams params) throws IOException { + ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields); + writer.writeForecastMessage(params); + } + @Override public void flushStream() throws IOException { recordWriter.flush(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 704b6d6082a..2a907a0c652 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -26,6 +26,8 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.ml.job.results.Forecast; +import org.elasticsearch.xpack.ml.job.results.ForecastStats; import org.elasticsearch.xpack.ml.job.results.Influencer; import org.elasticsearch.xpack.ml.job.results.ModelPlot; @@ -184,6 +186,20 @@ public class AutoDetectResultProcessor { if (modelPlot != null) { context.bulkResultsPersister.persistModelPlot(modelPlot); } + Forecast forecast = result.getForecast(); + if (forecast != null) { + context.bulkResultsPersister.persistForecast(forecast); + } + ForecastStats forecastStats = result.getForecastStats(); + if (forecastStats != null) { + // forecast stats are send by autodetect but do not get persisted, + // still they mark the end of a forecast + + LOGGER.trace("Received Forecast Stats [{}]", forecastStats.getId()); + + // forecast stats mark the end of a forecast, therefore commit whatever we have + context.bulkResultsPersister.executeRequest(); + } ModelSizeStats modelSizeStats = result.getModelSizeStats(); if (modelSizeStats != null) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java new file mode 100644 index 00000000000..d1e3f38d23d --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.params; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.joda.DateMathParser; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.xpack.ml.job.messages.Messages; + +import java.util.Objects; + +public class ForecastParams { + + private final long endTime; + private final long id; + + private ForecastParams(long id, long endTime) { + this.id = id; + this.endTime = endTime; + } + + /** + * The forecast end time in seconds from the epoch + * @return The end time in seconds from the epoch + */ + public long getEndTime() { + return endTime; + } + + /** + * The forecast id + * + * @return The forecast Id + */ + public long getId() { + return id; + } + + @Override + public int hashCode() { + return Objects.hash(id, endTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ForecastParams other = (ForecastParams) obj; + return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime); + } + + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long endTimeEpochSecs; + private long startTime; + private long forecastId; + + private Builder() { + startTime = System.currentTimeMillis(); + endTimeEpochSecs = tomorrow(startTime); + forecastId = generateId(); + } + + static long tomorrow(long now) { + return (now / 1000) + (60 * 60 * 24); + } + + private long generateId() { + return startTime; + } + + public Builder endTime(String endTime, ParseField paramName) { + DateMathParser dateMathParser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER); + + try { + endTimeEpochSecs = dateMathParser.parse(endTime, System::currentTimeMillis) / 1000; + } catch (Exception e) { + String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName.getPreferredName(), endTime); + throw new ElasticsearchParseException(msg, e); + } + + return this; + } + + public ForecastParams build() { + return new ForecastParams(forecastId, endTimeEpochSecs); + } + } +} + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index a43b26890ec..a174c528330 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -7,11 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; +import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import java.io.IOException; import java.io.OutputStream; @@ -37,6 +39,11 @@ public class ControlMsgToProcessWriter { */ private static final String FLUSH_MESSAGE_CODE = "f"; + /** + * This must match the code defined in the api::CAnomalyDetector C++ class. + */ + private static final String FORECAST_MESSAGE_CODE = "p"; + /** * This must match the code defined in the api::CAnomalyDetector C++ class. */ @@ -137,14 +144,32 @@ public class ControlMsgToProcessWriter { String flushId = Long.toString(ms_FlushNumber.getAndIncrement()); writeMessage(FLUSH_MESSAGE_CODE + flushId); - char[] spaces = new char[FLUSH_SPACES_LENGTH]; - Arrays.fill(spaces, ' '); - writeMessage(new String(spaces)); + fillCommandBuffer(); lengthEncodedWriter.flush(); return flushId; } + public void writeForecastMessage(ForecastParams params) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .field("forecast_id", params.getId()) + .field("end_time", params.getEndTime()) + .endObject(); + + writeMessage(FORECAST_MESSAGE_CODE + builder.string()); + fillCommandBuffer(); + lengthEncodedWriter.flush(); + } + + // todo(hendrikm): workaround, see + // https://github.com/elastic/machine-learning-cpp/issues/123 + private void fillCommandBuffer() throws IOException { + char[] spaces = new char[FLUSH_SPACES_LENGTH]; + Arrays.fill(spaces, ' '); + writeMessage(new String(spaces)); + } + public void writeResetBucketsMessage(DataLoadParams params) throws IOException { writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AutodetectResult.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AutodetectResult.java index c572e8dd434..be8a5841690 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AutodetectResult.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/AutodetectResult.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.results; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,7 +32,7 @@ public class AutodetectResult implements ToXContentObject, Writeable { TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List) a[1], (List) a[2], (Quantiles) a[3], a[4] == null ? null : ((ModelSnapshot.Builder) a[4]).build(), a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(), - (ModelPlot) a[6], (CategoryDefinition) a[7], (FlushAcknowledgement) a[8])); + (ModelPlot) a[6], (Forecast) a[7], (ForecastStats) a[8], (CategoryDefinition) a[9], (FlushAcknowledgement) a[10])); static { PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.RESULT_TYPE_FIELD); @@ -42,6 +43,8 @@ public class AutodetectResult implements ToXContentObject, Writeable { PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, ModelSizeStats.RESULT_TYPE_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelPlot.PARSER, ModelPlot.RESULTS_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Forecast.PARSER, Forecast.RESULTS_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastStats.PARSER, ForecastStats.RESULTS_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), CategoryDefinition.PARSER, CategoryDefinition.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), FlushAcknowledgement.PARSER, FlushAcknowledgement.TYPE); } @@ -53,12 +56,14 @@ public class AutodetectResult implements ToXContentObject, Writeable { private final ModelSnapshot modelSnapshot; private final ModelSizeStats modelSizeStats; private final ModelPlot modelPlot; + private final Forecast forecast; + private final ForecastStats forecastStats; private final CategoryDefinition categoryDefinition; private final FlushAcknowledgement flushAcknowledgement; public AutodetectResult(Bucket bucket, List records, List influencers, Quantiles quantiles, - ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, - CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) { + ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, Forecast forecast, + ForecastStats forecastStats, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) { this.bucket = bucket; this.records = records; this.influencers = influencers; @@ -66,6 +71,8 @@ public class AutodetectResult implements ToXContentObject, Writeable { this.modelSnapshot = modelSnapshot; this.modelSizeStats = modelSizeStats; this.modelPlot = modelPlot; + this.forecast = forecast; + this.forecastStats = forecastStats; this.categoryDefinition = categoryDefinition; this.flushAcknowledgement = flushAcknowledgement; } @@ -116,6 +123,22 @@ public class AutodetectResult implements ToXContentObject, Writeable { } else { this.flushAcknowledgement = null; } + + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + if (in.readBoolean()) { + this.forecast = new Forecast(in); + } else { + this.forecast = null; + } + if (in.readBoolean()) { + this.forecastStats = new ForecastStats(in); + } else { + this.forecastStats = null; + } + } else { + this.forecast = null; + this.forecastStats = null; + } } @Override @@ -129,6 +152,11 @@ public class AutodetectResult implements ToXContentObject, Writeable { writeNullable(modelPlot, out); writeNullable(categoryDefinition, out); writeNullable(flushAcknowledgement, out); + + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { + writeNullable(forecast, out); + writeNullable(forecastStats, out); + } } private static void writeNullable(Writeable writeable, StreamOutput out) throws IOException { @@ -157,6 +185,8 @@ public class AutodetectResult implements ToXContentObject, Writeable { addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder); addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder); addNullableField(ModelPlot.RESULTS_FIELD, modelPlot, builder); + addNullableField(Forecast.RESULTS_FIELD, forecast, builder); + addNullableField(ForecastStats.RESULTS_FIELD, forecastStats, builder); addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder); addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder); builder.endObject(); @@ -203,6 +233,14 @@ public class AutodetectResult implements ToXContentObject, Writeable { return modelPlot; } + public Forecast getForecast() { + return forecast; + } + + public ForecastStats getForecastStats() { + return forecastStats; + } + public CategoryDefinition getCategoryDefinition() { return categoryDefinition; } @@ -213,8 +251,8 @@ public class AutodetectResult implements ToXContentObject, Writeable { @Override public int hashCode() { - return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, modelSizeStats, - modelSnapshot, quantiles); + return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, forecast, forecastStats, + modelSizeStats, modelSnapshot, quantiles); } @Override @@ -232,6 +270,8 @@ public class AutodetectResult implements ToXContentObject, Writeable { Objects.equals(categoryDefinition, other.categoryDefinition) && Objects.equals(flushAcknowledgement, other.flushAcknowledgement) && Objects.equals(modelPlot, other.modelPlot) && + Objects.equals(forecast, other.forecast) && + Objects.equals(forecastStats, other.forecastStats) && Objects.equals(modelSizeStats, other.modelSizeStats) && Objects.equals(modelSnapshot, other.modelSnapshot) && Objects.equals(quantiles, other.quantiles); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Forecast.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Forecast.java new file mode 100644 index 00000000000..3a558fbffff --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/Forecast.java @@ -0,0 +1,308 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.results; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.time.TimeUtils; + +import java.io.IOException; +import java.util.Date; +import java.util.Objects; + +/** + * Model Forecast POJO. + */ +public class Forecast implements ToXContentObject, Writeable { + /** + * Result type + */ + public static final String RESULT_TYPE_VALUE = "model_forecast"; + public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE); + + public static final ParseField FORECAST_ID = new ParseField("forecast_id"); + public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name"); + public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value"); + public static final ParseField OVER_FIELD_NAME = new ParseField("over_field_name"); + public static final ParseField OVER_FIELD_VALUE = new ParseField("over_field_value"); + public static final ParseField BY_FIELD_NAME = new ParseField("by_field_name"); + public static final ParseField BY_FIELD_VALUE = new ParseField("by_field_value"); + public static final ParseField MODEL_FEATURE = new ParseField("model_feature"); + public static final ParseField FORECAST_LOWER = new ParseField("forecast_lower"); + public static final ParseField FORECAST_UPPER = new ParseField("forecast_upper"); + public static final ParseField FORECAST_PREDICTION = new ParseField("forecast_prediction"); + public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new Forecast((String) a[0], (long) a[1], (Date) a[2], (long) a[3])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID); + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + if (p.currentToken() == Token.VALUE_NUMBER) { + return new Date(p.longValue()); + } else if (p.currentToken() == Token.VALUE_STRING) { + return new Date(TimeUtils.dateStringToEpoch(p.text())); + } + throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for [" + + Result.TIMESTAMP.getPreferredName() + "]"); + }, Result.TIMESTAMP, ValueType.VALUE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN); + PARSER.declareString((modelForecast, s) -> {}, Result.RESULT_TYPE); + PARSER.declareString(Forecast::setPartitionFieldName, PARTITION_FIELD_NAME); + PARSER.declareString(Forecast::setPartitionFieldValue, PARTITION_FIELD_VALUE); + PARSER.declareString(Forecast::setOverFieldName, OVER_FIELD_NAME); + PARSER.declareString(Forecast::setOverFieldValue, OVER_FIELD_VALUE); + PARSER.declareString(Forecast::setByFieldName, BY_FIELD_NAME); + PARSER.declareString(Forecast::setByFieldValue, BY_FIELD_VALUE); + PARSER.declareString(Forecast::setModelFeature, MODEL_FEATURE); + PARSER.declareDouble(Forecast::setForecastLower, FORECAST_LOWER); + PARSER.declareDouble(Forecast::setForecastUpper, FORECAST_UPPER); + PARSER.declareDouble(Forecast::setForecastPrediction, FORECAST_PREDICTION); + } + + private final String jobId; + private final long forecastId; + private final Date timestamp; + private final long bucketSpan; + private String partitionFieldName; + private String partitionFieldValue; + private String overFieldName; + private String overFieldValue; + private String byFieldName; + private String byFieldValue; + private String modelFeature; + private double forecastLower; + private double forecastUpper; + private double forecastPrediction; + + public Forecast(String jobId, long forecastId, Date timestamp, long bucketSpan) { + this.jobId = jobId; + this.forecastId = forecastId; + this.timestamp = timestamp; + this.bucketSpan = bucketSpan; + } + + public Forecast(StreamInput in) throws IOException { + jobId = in.readString(); + forecastId = in.readLong(); + timestamp = new Date(in.readLong()); + partitionFieldName = in.readOptionalString(); + partitionFieldValue = in.readOptionalString(); + overFieldName = in.readOptionalString(); + overFieldValue = in.readOptionalString(); + byFieldName = in.readOptionalString(); + byFieldValue = in.readOptionalString(); + modelFeature = in.readOptionalString(); + forecastLower = in.readDouble(); + forecastUpper = in.readDouble(); + forecastPrediction = in.readDouble(); + bucketSpan = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(jobId); + out.writeLong(forecastId); + out.writeLong(timestamp.getTime()); + out.writeOptionalString(partitionFieldName); + out.writeOptionalString(partitionFieldValue); + out.writeOptionalString(overFieldName); + out.writeOptionalString(overFieldValue); + out.writeOptionalString(byFieldName); + out.writeOptionalString(byFieldValue); + out.writeOptionalString(modelFeature); + out.writeDouble(forecastLower); + out.writeDouble(forecastUpper); + out.writeDouble(forecastPrediction); + out.writeLong(bucketSpan); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.field(FORECAST_ID.getPreferredName(), forecastId); + builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); + builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan); + if (timestamp != null) { + builder.dateField(Result.TIMESTAMP.getPreferredName(), + Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime()); + } + if (partitionFieldName != null) { + builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName); + } + if (partitionFieldValue != null) { + builder.field(PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); + } + if (overFieldName != null) { + builder.field(OVER_FIELD_NAME.getPreferredName(), overFieldName); + } + if (overFieldValue != null) { + builder.field(OVER_FIELD_VALUE.getPreferredName(), overFieldValue); + } + if (byFieldName != null) { + builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName); + } + if (byFieldValue != null) { + builder.field(BY_FIELD_VALUE.getPreferredName(), byFieldValue); + } + if (modelFeature != null) { + builder.field(MODEL_FEATURE.getPreferredName(), modelFeature); + } + builder.field(FORECAST_LOWER.getPreferredName(), forecastLower); + builder.field(FORECAST_UPPER.getPreferredName(), forecastUpper); + builder.field(FORECAST_PREDICTION.getPreferredName(), forecastPrediction); + builder.endObject(); + return builder; + } + + public String getJobId() { + return jobId; + } + + public long getForecastId() { + return forecastId; + } + + public String getId() { + int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue); + int length = (byFieldValue == null ? 0 : byFieldValue.length()) + + (overFieldValue == null ? 0 : overFieldValue.length()) + + (partitionFieldValue == null ? 0 : partitionFieldValue.length()); + return jobId + "_model_forecast_" + forecastId + "_" + timestamp.getTime() + "_" + bucketSpan + "_" + + (modelFeature == null ? "" : modelFeature) + "_" + valuesHash + "_" + length; + } + + public Date getTimestamp() { + return timestamp; + } + + public long getBucketSpan() { + return bucketSpan; + } + + public String getPartitionFieldName() { + return partitionFieldName; + } + + public void setPartitionFieldName(String partitionFieldName) { + this.partitionFieldName = partitionFieldName; + } + + public String getPartitionFieldValue() { + return partitionFieldValue; + } + + public void setPartitionFieldValue(String partitionFieldValue) { + this.partitionFieldValue = partitionFieldValue; + } + + public String getOverFieldName() { + return overFieldName; + } + + public void setOverFieldName(String overFieldName) { + this.overFieldName = overFieldName; + } + + public String getOverFieldValue() { + return overFieldValue; + } + + public void setOverFieldValue(String overFieldValue) { + this.overFieldValue = overFieldValue; + } + + public String getByFieldName() { + return byFieldName; + } + + public void setByFieldName(String byFieldName) { + this.byFieldName = byFieldName; + } + + public String getByFieldValue() { + return byFieldValue; + } + + public void setByFieldValue(String byFieldValue) { + this.byFieldValue = byFieldValue; + } + + public String getModelFeature() { + return modelFeature; + } + + public void setModelFeature(String modelFeature) { + this.modelFeature = modelFeature; + } + + public double getForecastLower() { + return forecastLower; + } + + public void setForecastLower(double forecastLower) { + this.forecastLower = forecastLower; + } + + public double getForecastUpper() { + return forecastUpper; + } + + public void setForecastUpper(double forecastUpper) { + this.forecastUpper = forecastUpper; + } + + public double getForecastPrediction() { + return forecastPrediction; + } + + public void setForecastPrediction(double forecastPrediction) { + this.forecastPrediction = forecastPrediction; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof Forecast == false) { + return false; + } + Forecast that = (Forecast) other; + return Objects.equals(this.jobId, that.jobId) && + forecastId == that.forecastId && + Objects.equals(this.timestamp, that.timestamp) && + Objects.equals(this.partitionFieldValue, that.partitionFieldValue) && + Objects.equals(this.partitionFieldName, that.partitionFieldName) && + Objects.equals(this.overFieldValue, that.overFieldValue) && + Objects.equals(this.overFieldName, that.overFieldName) && + Objects.equals(this.byFieldValue, that.byFieldValue) && + Objects.equals(this.byFieldName, that.byFieldName) && + Objects.equals(this.modelFeature, that.modelFeature) && + this.forecastLower == that.forecastLower && + this.forecastUpper == that.forecastUpper && + this.forecastPrediction == that.forecastPrediction && + this.bucketSpan == that.bucketSpan; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, forecastId, timestamp, partitionFieldName, partitionFieldValue, + overFieldName, overFieldValue, byFieldName, byFieldValue, + modelFeature, forecastLower, forecastUpper, forecastPrediction, bucketSpan); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastStats.java new file mode 100644 index 00000000000..9ec3122bc69 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastStats.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.results; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.io.IOException; +import java.util.Objects; + +/** + * Model ForecastStats POJO. + * + * Note forecast stats are sent from the autodetect process but do not get + * indexed. + */ +public class ForecastStats implements ToXContentObject, Writeable { + /** + * Result type + */ + public static final String RESULT_TYPE_VALUE = "model_forecast_stats"; + public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE); + + public static final ParseField FORECAST_ID = new ParseField("forecast_id"); + public static final ParseField RECORD_COUNT = new ParseField("forecast_record_count"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastStats((String) a[0], (long) a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID); + + PARSER.declareString((modelForecastStats, s) -> {}, Result.RESULT_TYPE); + PARSER.declareLong(ForecastStats::setRecordCount, RECORD_COUNT); + } + + private final String jobId; + private final long forecastId; + private long recordCount; + + public ForecastStats(String jobId, long forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + public ForecastStats(StreamInput in) throws IOException { + jobId = in.readString(); + forecastId = in.readLong(); + recordCount = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(jobId); + out.writeLong(forecastId); + out.writeLong(recordCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); + builder.field(FORECAST_ID.getPreferredName(), forecastId); + builder.field(RECORD_COUNT.getPreferredName(), recordCount); + builder.endObject(); + return builder; + } + + public String getJobId() { + return jobId; + } + + public String getId() { + return jobId + "_model_forecast_stats"; + } + + public void setRecordCount(long recordCount) { + this.recordCount = recordCount; + } + + public double getRecordCount() { + return recordCount; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof ForecastStats == false) { + return false; + } + ForecastStats that = (ForecastStats) other; + return Objects.equals(this.jobId, that.jobId) && + this.forecastId == that.forecastId && + this.recordCount == that.recordCount; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, forecastId, recordCount); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index b4404e931f4..565951b20d3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -127,6 +127,10 @@ public final class ReservedFieldNames { ModelPlot.MODEL_UPPER.getPreferredName(), ModelPlot.MODEL_MEDIAN.getPreferredName(), ModelPlot.ACTUAL.getPreferredName(), + Forecast.FORECAST_LOWER.getPreferredName(), Forecast.FORECAST_UPPER.getPreferredName(), + Forecast.FORECAST_PREDICTION.getPreferredName(), + Forecast.FORECAST_ID.getPreferredName(), + ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName(), ModelSizeStats.TOTAL_BY_FIELD_COUNT_FIELD.getPreferredName(), ModelSizeStats.TOTAL_OVER_FIELD_COUNT_FIELD.getPreferredName(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java new file mode 100644 index 00000000000..7f80c1cce52 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.job; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.ForecastJobAction; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.io.IOException; + +public class RestForecastJobAction extends BaseRestHandler { + + public RestForecastJobAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, + MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_forecast", this); + } + + @Override + public String getName() { + return "xpack_ml_forecast_job_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String jobId = restRequest.param(Job.ID.getPreferredName()); + final ForecastJobAction.Request request; + if (restRequest.hasContentOrSourceParam()) { + XContentParser parser = restRequest.contentOrSourceParamParser(); + request = ForecastJobAction.Request.parseRequest(jobId, parser); + } else { + request = new ForecastJobAction.Request(restRequest.param(Job.ID.getPreferredName())); + if (restRequest.hasParam(ForecastJobAction.Request.END_TIME.getPreferredName())) { + request.setEndTime(restRequest.param(ForecastJobAction.Request.END_TIME.getPreferredName())); + } + } + + return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java new file mode 100644 index 00000000000..bc143b84406 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionRequestTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request; + +public class ForecastJobActionRequestTests extends AbstractStreamableXContentTestCase { + + @Override + protected Request doParseInstance(XContentParser parser) { + return Request.parseRequest(null, parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLengthBetween(1, 20)); + return request; + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } + +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java new file mode 100644 index 00000000000..7933b49f3fd --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.action.ForecastJobAction.Response; + +public class ForecastJobActionResponseTests extends AbstractStreamableTestCase { + + @Override + protected Response createTestInstance() { + return new Response(randomBoolean(), randomNonNegativeLong()); + } + + @Override + protected Response createBlankInstance() { + return new Response(); + } + +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 5b7fbe3826f..35b7e28bb35 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -357,47 +357,47 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { private List results = new ArrayList<>(); ResultsBuilder addBucket(Bucket bucket) { - results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null)); + results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null)); return this; } ResultsBuilder addRecords(List records) { - results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null)); + results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null)); return this; } ResultsBuilder addInfluencers(List influencers) { - results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null)); + results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null)); return this; } ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) { - results.add(new AutodetectResult(null, null, null, null, null, null, null, categoryDefinition, null)); + results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, categoryDefinition, null)); return this; } ResultsBuilder addmodelPlot(ModelPlot modelPlot) { - results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null)); + results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null)); return this; } ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) { - results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null)); + results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null)); return this; } ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) { - results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null)); + results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null)); return this; } ResultsBuilder addQuantiles(Quantiles quantiles) { - results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null)); + results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null)); return this; } ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) { - results.add(new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement)); + results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement)); return this; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java new file mode 100644 index 00000000000..f2b59d6a103 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.params; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.job.messages.Messages; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ForecastParamsTests extends ESTestCase { + + private static ParseField END = new ParseField("end"); + + public void testDefault_GivesTomorrowTimeInSeconds() { + long nowSecs = System.currentTimeMillis() / 1000; + nowSecs += 60 * 60 * 24; + + ForecastParams params = ForecastParams.builder().build(); + assertThat(params.getEndTime(), greaterThanOrEqualTo(nowSecs)); + assertThat(params.getEndTime(), lessThanOrEqualTo(nowSecs +1)); + } + + public void test_UnparseableEndTimeThrows() { + ElasticsearchParseException e = + ESTestCase.expectThrows(ElasticsearchParseException.class, () -> ForecastParams.builder().endTime("bad", END).build()); + assertEquals(Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, "end", "bad"), e.getMessage()); + } + + public void testFormats() { + assertEquals(10L, ForecastParams.builder().endTime("10000", END).build().getEndTime()); + assertEquals(1462096800L, ForecastParams.builder().endTime("2016-05-01T10:00:00Z", END).build().getEndTime()); + + long nowSecs = System.currentTimeMillis() / 1000; + long end = ForecastParams.builder().endTime("now+2H", END).build().getEndTime(); + assertThat(end, greaterThanOrEqualTo(nowSecs + 7200)); + assertThat(end, lessThanOrEqualTo(nowSecs + 7200 +1)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java index 1f9cb78f590..2090275f625 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java @@ -35,6 +35,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase { + + @Override + protected ForecastStats parseInstance(XContentParser parser) { + return ForecastStats.PARSER.apply(parser, null); + } + + @Override + protected ForecastStats createTestInstance() { + return createTestInstance("ForecastStatsTest", randomNonNegativeLong()); + } + + public ForecastStats createTestInstance(String jobId, long forecastId) { + ForecastStats forecastStats = new ForecastStats(jobId, forecastId); + + if (randomBoolean()) { + forecastStats.setRecordCount(randomLong()); + } + + return forecastStats; + } + + @Override + protected Reader instanceReader() { + return ForecastStats::new; + } + + @Override + protected ForecastStats doParseInstance(XContentParser parser) throws IOException { + return ForecastStats.PARSER.apply(parser, null); + } + +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastTests.java new file mode 100644 index 00000000000..97249ae4f76 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastTests.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.results; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Date; + +public class ForecastTests extends AbstractSerializingTestCase { + + @Override + protected Forecast parseInstance(XContentParser parser) { + return Forecast.PARSER.apply(parser, null); + } + + @Override + protected Forecast createTestInstance() { + return createTestInstance("ForecastTest"); + } + + public Forecast createTestInstance(String jobId) { + Forecast forecast = new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()), randomNonNegativeLong()); + + if (randomBoolean()) { + forecast.setByFieldName(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + forecast.setByFieldValue(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + forecast.setPartitionFieldName(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + forecast.setPartitionFieldValue(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + forecast.setModelFeature(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + forecast.setForecastLower(randomDouble()); + } + if (randomBoolean()) { + forecast.setForecastUpper(randomDouble()); + } + if (randomBoolean()) { + forecast.setForecastPrediction(randomDouble()); + } + + return forecast; + } + + @Override + protected Reader instanceReader() { + return Forecast::new; + } + + @Override + protected Forecast doParseInstance(XContentParser parser) throws IOException { + return Forecast.PARSER.apply(parser, null); + } + +} diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index ef4937816c5..496745bd1c7 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -157,3 +157,4 @@ indices:data/write/update/byquery indices:data/write/delete/byquery indices:data/write/reindex cluster:admin/xpack/deprecation/info +cluster:admin/xpack/ml/job/forecast diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.forecast.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.forecast.json new file mode 100644 index 00000000000..04db5e2ecf2 --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.forecast.json @@ -0,0 +1,24 @@ +{ + "xpack.ml.forecast": { + "methods": [ "POST" ], + "url": { + "path": "/_xpack/ml/anomaly_detectors/{job_id}/_forecast", + "paths": [ "/_xpack/ml/anomaly_detectors/{job_id}/_forecast" ], + "parts": { + "job_id": { + "type": "string", + "required": true, + "description": "The ID of the job to forecast for" + } + }, + "params": { + "end": { + "type": "string", + "required": false, + "description": "The end time of the forecast" + } + } + }, + "body": null + } +} diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/forecast.yml b/plugin/src/test/resources/rest-api-spec/test/ml/forecast.yml new file mode 100644 index 00000000000..643af83a78f --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/test/ml/forecast.yml @@ -0,0 +1,41 @@ +setup: + - do: + xpack.ml.put_job: + job_id: forecast-job + body: > + { + "description":"A forecast job", + "analysis_config" : { + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "format":"xcontent" + } + } + +--- +"Test forecast unknown job": + - do: + catch: missing + xpack.ml.forecast: + job_id: "non-existing-job" + +--- +"Test forecast on closed job": + - do: + catch: /status_exception/ + xpack.ml.forecast: + job_id: "forecast-job" + +--- +"Test bad end param errors": + + - do: + xpack.ml.open_job: + job_id: "forecast-job" + + - do: + catch: /parse_exception/ + xpack.ml.forecast: + job_id: "forecast-job" + end: "tomorrow"