[ML-FC] Branch landing feature/ml (elastic/x-pack-elasticsearch#2500)

integrate forecasting feature branch into master

    - add endpoint xpack/ml/job/forecast to request forecasting on data of ml-jobs
       - current parameters: end time
    - persists forecast results into shared or own index
       - different runs are separated by a 'forecast id'

relates elastic/x-pack-elasticsearch#1838

Original commit: elastic/x-pack-elasticsearch@f9d701a6bc
This commit is contained in:
Hendrik Muhs 2017-09-14 12:31:20 +02:00 committed by GitHub
parent 4f3e740ba8
commit 7d19264363
27 changed files with 1302 additions and 23 deletions

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.action.GetCategoriesAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsAction;
@ -108,6 +109,7 @@ import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction;
import org.elasticsearch.xpack.ml.rest.job.RestCloseJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestFlushJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestForecastJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestGetJobStatsAction;
import org.elasticsearch.xpack.ml.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.ml.rest.job.RestOpenJobAction;
@ -383,7 +385,8 @@ public class MachineLearning implements ActionPlugin {
new RestStartDatafeedAction(settings, restController),
new RestStopDatafeedAction(settings, restController),
new RestDeleteModelSnapshotAction(settings, restController),
new RestDeleteExpiredDataAction(settings, restController)
new RestDeleteExpiredDataAction(settings, restController),
new RestForecastJobAction(settings, restController)
);
}
@ -431,7 +434,8 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class),
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class)
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class),
new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class)
);
}

View File

@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.ml.action.ForecastJobAction.Request.END_TIME;
public class ForecastJobAction extends Action<ForecastJobAction.Request, ForecastJobAction.Response, ForecastJobAction.RequestBuilder> {
public static final ForecastJobAction INSTANCE = new ForecastJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/forecast";
private ForecastJobAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> implements ToXContentObject {
public static final ParseField END_TIME = new ParseField("end");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareString(Request::setEndTime, END_TIME);
}
public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (jobId != null) {
request.jobId = jobId;
}
return request;
}
private String endTime;
Request() {
}
public Request(String jobId) {
super(jobId);
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.endTime = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(endTime);
}
@Override
public int hashCode() {
return Objects.hash(jobId, endTime);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
}
builder.endObject();
return builder;
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client, ForecastJobAction action) {
super(client, action, new Request());
}
}
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
private long id;
Response() {
super(null, null);
}
Response(boolean acknowledged, long id) {
super(null, null);
this.acknowledged = acknowledged;
this.id = id;
}
public boolean isacknowledged() {
return acknowledged;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("acknowledged", acknowledged);
builder.field("id", id);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Response response = (Response) o;
return acknowledged == response.acknowledged;
}
@Override
public int hashCode() {
return Objects.hash(acknowledged);
}
}
public static class TransportAction extends TransportJobTaskAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager) {
super(settings, ForecastJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ForecastJobAction.Request::new, ForecastJobAction.Response::new, ThreadPool.Names.SAME,
processManager);
// ThreadPool.Names.SAME, because operations is executed by
// autodetect worker thread
}
@Override
protected ForecastJobAction.Response readTaskResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
ForecastParams.Builder paramsBuilder = ForecastParams.builder();
if (request.getEndTime() != null) {
paramsBuilder.endTime(request.getEndTime(), END_TIME);
}
ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {
if (e == null) {
listener.onResponse(new Response(true, params.getId()));
} else {
listener.onFailure(e);
}
});
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.Influence;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
@ -289,6 +290,7 @@ public class ElasticsearchMappings {
.field(TYPE, DOUBLE)
.endObject();
addForecastFieldsToMapping(builder);
addAnomalyRecordFieldsToMapping(builder);
addInfluencerFieldsToMapping(builder);
addModelSizeStatsFieldsToMapping(builder);
@ -320,6 +322,24 @@ public class ElasticsearchMappings {
}
}
private static void addForecastFieldsToMapping(XContentBuilder builder) throws IOException {
// Forecast Output
builder.startObject(Forecast.FORECAST_LOWER.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Forecast.FORECAST_UPPER.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Forecast.FORECAST_PREDICTION.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(Forecast.FORECAST_ID.getPreferredName())
.field(TYPE, LONG)
.endObject();
}
/**
* AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
@ -151,6 +152,12 @@ public class JobResultsPersister extends AbstractComponent {
return this;
}
public Builder persistForecast(Forecast forecast) {
logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId());
indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE);
return this;
}
private void indexResult(String id, ToXContent resultDoc, String resultType) {
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResult
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -205,6 +206,13 @@ public class AutodetectCommunicator implements Closeable {
}, handler);
}
public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
autodetectProcess.forecastJob(params);
return null;
}, handler);
}
@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) {
LOGGER.debug("[{}] waiting for flush", job.getId());

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -85,6 +86,14 @@ public interface AutodetectProcess extends Closeable {
*/
String flushJob(FlushJobParams params) throws IOException;
/**
* Do a forecast on a running job.
*
* @param params The forecast parameters
* @throws IOException If the write fails
*/
void forecastJob(ForecastParams params) throws IOException;
/**
* Flush the output data stream
*/

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledge
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -49,6 +50,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -240,6 +242,33 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
/**
* Do a forecast for the running job.
*
* @param jobTask The job task
* @param params Forecast parameters
*/
public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Exception> handler) {
logger.debug("Forecasting job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId());
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.forecastJob(params, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobTask.getJobId());
logger.error(msg, e);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
});
}
public void writeUpdateProcessMessage(JobTask jobTask, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
Consumer<Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId());

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -78,7 +79,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
@Override
public String flushJob(FlushJobParams params) throws IOException {
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID, null);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement);
results.add(result);
return FLUSH_ID;
}
@ -91,7 +92,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public void close() throws IOException {
if (open) {
Quantiles quantiles = new Quantiles(jobId, new Date(), "black hole quantiles");
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null);
AutodetectResult result = new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null);
results.add(result);
open = false;
}
@ -147,4 +148,8 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public String readError() {
return "";
}
@Override
public void forecastJob(ForecastParams params) throws IOException {
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResult
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
@ -94,7 +95,9 @@ class NativeAutodetectProcess implements AutodetectProcess {
if (processCloseInitiated == false && processKilled == false) {
// The log message doesn't say "crashed", as the process could have been killed
// by a user or other process (e.g. the Linux OOM killer)
LOGGER.error("[{}] autodetect process stopped unexpectedly", jobId);
String errors = cppLogHandler.getErrors();
LOGGER.error("[{}] autodetect process stopped unexpectedly: {}", jobId, errors);
onProcessCrash.run();
}
}
@ -163,6 +166,12 @@ class NativeAutodetectProcess implements AutodetectProcess {
return writer.writeFlushMessage();
}
@Override
public void forecastJob(ForecastParams params) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
writer.writeForecastMessage(params);
}
@Override
public void flushStream() throws IOException {
recordWriter.flush();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastStats;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
@ -184,6 +186,20 @@ public class AutoDetectResultProcessor {
if (modelPlot != null) {
context.bulkResultsPersister.persistModelPlot(modelPlot);
}
Forecast forecast = result.getForecast();
if (forecast != null) {
context.bulkResultsPersister.persistForecast(forecast);
}
ForecastStats forecastStats = result.getForecastStats();
if (forecastStats != null) {
// forecast stats are send by autodetect but do not get persisted,
// still they mark the end of a forecast
LOGGER.trace("Received Forecast Stats [{}]", forecastStats.getId());
// forecast stats mark the end of a forecast, therefore commit whatever we have
context.bulkResultsPersister.executeRequest();
}
ModelSizeStats modelSizeStats = result.getModelSizeStats();
if (modelSizeStats != null) {
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",

View File

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import java.util.Objects;
public class ForecastParams {
private final long endTime;
private final long id;
private ForecastParams(long id, long endTime) {
this.id = id;
this.endTime = endTime;
}
/**
* The forecast end time in seconds from the epoch
* @return The end time in seconds from the epoch
*/
public long getEndTime() {
return endTime;
}
/**
* The forecast id
*
* @return The forecast Id
*/
public long getId() {
return id;
}
@Override
public int hashCode() {
return Objects.hash(id, endTime);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ForecastParams other = (ForecastParams) obj;
return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long endTimeEpochSecs;
private long startTime;
private long forecastId;
private Builder() {
startTime = System.currentTimeMillis();
endTimeEpochSecs = tomorrow(startTime);
forecastId = generateId();
}
static long tomorrow(long now) {
return (now / 1000) + (60 * 60 * 24);
}
private long generateId() {
return startTime;
}
public Builder endTime(String endTime, ParseField paramName) {
DateMathParser dateMathParser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER);
try {
endTimeEpochSecs = dateMathParser.parse(endTime, System::currentTimeMillis) / 1000;
} catch (Exception e) {
String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName.getPreferredName(), endTime);
throw new ElasticsearchParseException(msg, e);
}
return this;
}
public ForecastParams build() {
return new ForecastParams(forecastId, endTimeEpochSecs);
}
}
}

View File

@ -7,11 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import java.io.IOException;
import java.io.OutputStream;
@ -37,6 +39,11 @@ public class ControlMsgToProcessWriter {
*/
private static final String FLUSH_MESSAGE_CODE = "f";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
*/
private static final String FORECAST_MESSAGE_CODE = "p";
/**
* This must match the code defined in the api::CAnomalyDetector C++ class.
*/
@ -137,14 +144,32 @@ public class ControlMsgToProcessWriter {
String flushId = Long.toString(ms_FlushNumber.getAndIncrement());
writeMessage(FLUSH_MESSAGE_CODE + flushId);
char[] spaces = new char[FLUSH_SPACES_LENGTH];
Arrays.fill(spaces, ' ');
writeMessage(new String(spaces));
fillCommandBuffer();
lengthEncodedWriter.flush();
return flushId;
}
public void writeForecastMessage(ForecastParams params) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("forecast_id", params.getId())
.field("end_time", params.getEndTime())
.endObject();
writeMessage(FORECAST_MESSAGE_CODE + builder.string());
fillCommandBuffer();
lengthEncodedWriter.flush();
}
// todo(hendrikm): workaround, see
// https://github.com/elastic/machine-learning-cpp/issues/123
private void fillCommandBuffer() throws IOException {
char[] spaces = new char[FLUSH_SPACES_LENGTH];
Arrays.fill(spaces, ' ');
writeMessage(new String(spaces));
}
public void writeResetBucketsMessage(DataLoadParams params) throws IOException {
writeControlCodeFollowedByTimeRange(RESET_BUCKETS_MESSAGE_CODE, params.getStart(), params.getEnd());
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,7 +32,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List<AnomalyRecord>) a[1], (List<Influencer>) a[2],
(Quantiles) a[3], a[4] == null ? null : ((ModelSnapshot.Builder) a[4]).build(),
a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(),
(ModelPlot) a[6], (CategoryDefinition) a[7], (FlushAcknowledgement) a[8]));
(ModelPlot) a[6], (Forecast) a[7], (ForecastStats) a[8], (CategoryDefinition) a[9], (FlushAcknowledgement) a[10]));
static {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.RESULT_TYPE_FIELD);
@ -42,6 +43,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER,
ModelSizeStats.RESULT_TYPE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelPlot.PARSER, ModelPlot.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Forecast.PARSER, Forecast.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastStats.PARSER, ForecastStats.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), CategoryDefinition.PARSER, CategoryDefinition.TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), FlushAcknowledgement.PARSER, FlushAcknowledgement.TYPE);
}
@ -53,12 +56,14 @@ public class AutodetectResult implements ToXContentObject, Writeable {
private final ModelSnapshot modelSnapshot;
private final ModelSizeStats modelSizeStats;
private final ModelPlot modelPlot;
private final Forecast forecast;
private final ForecastStats forecastStats;
private final CategoryDefinition categoryDefinition;
private final FlushAcknowledgement flushAcknowledgement;
public AutodetectResult(Bucket bucket, List<AnomalyRecord> records, List<Influencer> influencers, Quantiles quantiles,
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot,
CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, Forecast forecast,
ForecastStats forecastStats, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
this.bucket = bucket;
this.records = records;
this.influencers = influencers;
@ -66,6 +71,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
this.modelSnapshot = modelSnapshot;
this.modelSizeStats = modelSizeStats;
this.modelPlot = modelPlot;
this.forecast = forecast;
this.forecastStats = forecastStats;
this.categoryDefinition = categoryDefinition;
this.flushAcknowledgement = flushAcknowledgement;
}
@ -116,6 +123,22 @@ public class AutodetectResult implements ToXContentObject, Writeable {
} else {
this.flushAcknowledgement = null;
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
if (in.readBoolean()) {
this.forecast = new Forecast(in);
} else {
this.forecast = null;
}
if (in.readBoolean()) {
this.forecastStats = new ForecastStats(in);
} else {
this.forecastStats = null;
}
} else {
this.forecast = null;
this.forecastStats = null;
}
}
@Override
@ -129,6 +152,11 @@ public class AutodetectResult implements ToXContentObject, Writeable {
writeNullable(modelPlot, out);
writeNullable(categoryDefinition, out);
writeNullable(flushAcknowledgement, out);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
writeNullable(forecast, out);
writeNullable(forecastStats, out);
}
}
private static void writeNullable(Writeable writeable, StreamOutput out) throws IOException {
@ -157,6 +185,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder);
addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder);
addNullableField(ModelPlot.RESULTS_FIELD, modelPlot, builder);
addNullableField(Forecast.RESULTS_FIELD, forecast, builder);
addNullableField(ForecastStats.RESULTS_FIELD, forecastStats, builder);
addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder);
addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder);
builder.endObject();
@ -203,6 +233,14 @@ public class AutodetectResult implements ToXContentObject, Writeable {
return modelPlot;
}
public Forecast getForecast() {
return forecast;
}
public ForecastStats getForecastStats() {
return forecastStats;
}
public CategoryDefinition getCategoryDefinition() {
return categoryDefinition;
}
@ -213,8 +251,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, modelSizeStats,
modelSnapshot, quantiles);
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, forecast, forecastStats,
modelSizeStats, modelSnapshot, quantiles);
}
@Override
@ -232,6 +270,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
Objects.equals(categoryDefinition, other.categoryDefinition) &&
Objects.equals(flushAcknowledgement, other.flushAcknowledgement) &&
Objects.equals(modelPlot, other.modelPlot) &&
Objects.equals(forecast, other.forecast) &&
Objects.equals(forecastStats, other.forecastStats) &&
Objects.equals(modelSizeStats, other.modelSizeStats) &&
Objects.equals(modelSnapshot, other.modelSnapshot) &&
Objects.equals(quantiles, other.quantiles);

View File

@ -0,0 +1,308 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
/**
* Model Forecast POJO.
*/
public class Forecast implements ToXContentObject, Writeable {
/**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "model_forecast";
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name");
public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value");
public static final ParseField OVER_FIELD_NAME = new ParseField("over_field_name");
public static final ParseField OVER_FIELD_VALUE = new ParseField("over_field_value");
public static final ParseField BY_FIELD_NAME = new ParseField("by_field_name");
public static final ParseField BY_FIELD_VALUE = new ParseField("by_field_value");
public static final ParseField MODEL_FEATURE = new ParseField("model_feature");
public static final ParseField FORECAST_LOWER = new ParseField("forecast_lower");
public static final ParseField FORECAST_UPPER = new ParseField("forecast_upper");
public static final ParseField FORECAST_PREDICTION = new ParseField("forecast_prediction");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ConstructingObjectParser<Forecast, Void> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new Forecast((String) a[0], (long) a[1], (Date) a[2], (long) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException("unexpected token [" + p.currentToken() + "] for ["
+ Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), BUCKET_SPAN);
PARSER.declareString((modelForecast, s) -> {}, Result.RESULT_TYPE);
PARSER.declareString(Forecast::setPartitionFieldName, PARTITION_FIELD_NAME);
PARSER.declareString(Forecast::setPartitionFieldValue, PARTITION_FIELD_VALUE);
PARSER.declareString(Forecast::setOverFieldName, OVER_FIELD_NAME);
PARSER.declareString(Forecast::setOverFieldValue, OVER_FIELD_VALUE);
PARSER.declareString(Forecast::setByFieldName, BY_FIELD_NAME);
PARSER.declareString(Forecast::setByFieldValue, BY_FIELD_VALUE);
PARSER.declareString(Forecast::setModelFeature, MODEL_FEATURE);
PARSER.declareDouble(Forecast::setForecastLower, FORECAST_LOWER);
PARSER.declareDouble(Forecast::setForecastUpper, FORECAST_UPPER);
PARSER.declareDouble(Forecast::setForecastPrediction, FORECAST_PREDICTION);
}
private final String jobId;
private final long forecastId;
private final Date timestamp;
private final long bucketSpan;
private String partitionFieldName;
private String partitionFieldValue;
private String overFieldName;
private String overFieldValue;
private String byFieldName;
private String byFieldValue;
private String modelFeature;
private double forecastLower;
private double forecastUpper;
private double forecastPrediction;
public Forecast(String jobId, long forecastId, Date timestamp, long bucketSpan) {
this.jobId = jobId;
this.forecastId = forecastId;
this.timestamp = timestamp;
this.bucketSpan = bucketSpan;
}
public Forecast(StreamInput in) throws IOException {
jobId = in.readString();
forecastId = in.readLong();
timestamp = new Date(in.readLong());
partitionFieldName = in.readOptionalString();
partitionFieldValue = in.readOptionalString();
overFieldName = in.readOptionalString();
overFieldValue = in.readOptionalString();
byFieldName = in.readOptionalString();
byFieldValue = in.readOptionalString();
modelFeature = in.readOptionalString();
forecastLower = in.readDouble();
forecastUpper = in.readDouble();
forecastPrediction = in.readDouble();
bucketSpan = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(forecastId);
out.writeLong(timestamp.getTime());
out.writeOptionalString(partitionFieldName);
out.writeOptionalString(partitionFieldValue);
out.writeOptionalString(overFieldName);
out.writeOptionalString(overFieldValue);
out.writeOptionalString(byFieldName);
out.writeOptionalString(byFieldValue);
out.writeOptionalString(modelFeature);
out.writeDouble(forecastLower);
out.writeDouble(forecastUpper);
out.writeDouble(forecastPrediction);
out.writeLong(bucketSpan);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(FORECAST_ID.getPreferredName(), forecastId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan);
if (timestamp != null) {
builder.dateField(Result.TIMESTAMP.getPreferredName(),
Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
}
if (partitionFieldName != null) {
builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName);
}
if (partitionFieldValue != null) {
builder.field(PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue);
}
if (overFieldName != null) {
builder.field(OVER_FIELD_NAME.getPreferredName(), overFieldName);
}
if (overFieldValue != null) {
builder.field(OVER_FIELD_VALUE.getPreferredName(), overFieldValue);
}
if (byFieldName != null) {
builder.field(BY_FIELD_NAME.getPreferredName(), byFieldName);
}
if (byFieldValue != null) {
builder.field(BY_FIELD_VALUE.getPreferredName(), byFieldValue);
}
if (modelFeature != null) {
builder.field(MODEL_FEATURE.getPreferredName(), modelFeature);
}
builder.field(FORECAST_LOWER.getPreferredName(), forecastLower);
builder.field(FORECAST_UPPER.getPreferredName(), forecastUpper);
builder.field(FORECAST_PREDICTION.getPreferredName(), forecastPrediction);
builder.endObject();
return builder;
}
public String getJobId() {
return jobId;
}
public long getForecastId() {
return forecastId;
}
public String getId() {
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
(overFieldValue == null ? 0 : overFieldValue.length()) +
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
return jobId + "_model_forecast_" + forecastId + "_" + timestamp.getTime() + "_" + bucketSpan + "_"
+ (modelFeature == null ? "" : modelFeature) + "_" + valuesHash + "_" + length;
}
public Date getTimestamp() {
return timestamp;
}
public long getBucketSpan() {
return bucketSpan;
}
public String getPartitionFieldName() {
return partitionFieldName;
}
public void setPartitionFieldName(String partitionFieldName) {
this.partitionFieldName = partitionFieldName;
}
public String getPartitionFieldValue() {
return partitionFieldValue;
}
public void setPartitionFieldValue(String partitionFieldValue) {
this.partitionFieldValue = partitionFieldValue;
}
public String getOverFieldName() {
return overFieldName;
}
public void setOverFieldName(String overFieldName) {
this.overFieldName = overFieldName;
}
public String getOverFieldValue() {
return overFieldValue;
}
public void setOverFieldValue(String overFieldValue) {
this.overFieldValue = overFieldValue;
}
public String getByFieldName() {
return byFieldName;
}
public void setByFieldName(String byFieldName) {
this.byFieldName = byFieldName;
}
public String getByFieldValue() {
return byFieldValue;
}
public void setByFieldValue(String byFieldValue) {
this.byFieldValue = byFieldValue;
}
public String getModelFeature() {
return modelFeature;
}
public void setModelFeature(String modelFeature) {
this.modelFeature = modelFeature;
}
public double getForecastLower() {
return forecastLower;
}
public void setForecastLower(double forecastLower) {
this.forecastLower = forecastLower;
}
public double getForecastUpper() {
return forecastUpper;
}
public void setForecastUpper(double forecastUpper) {
this.forecastUpper = forecastUpper;
}
public double getForecastPrediction() {
return forecastPrediction;
}
public void setForecastPrediction(double forecastPrediction) {
this.forecastPrediction = forecastPrediction;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof Forecast == false) {
return false;
}
Forecast that = (Forecast) other;
return Objects.equals(this.jobId, that.jobId) &&
forecastId == that.forecastId &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.partitionFieldValue, that.partitionFieldValue) &&
Objects.equals(this.partitionFieldName, that.partitionFieldName) &&
Objects.equals(this.overFieldValue, that.overFieldValue) &&
Objects.equals(this.overFieldName, that.overFieldName) &&
Objects.equals(this.byFieldValue, that.byFieldValue) &&
Objects.equals(this.byFieldName, that.byFieldName) &&
Objects.equals(this.modelFeature, that.modelFeature) &&
this.forecastLower == that.forecastLower &&
this.forecastUpper == that.forecastUpper &&
this.forecastPrediction == that.forecastPrediction &&
this.bucketSpan == that.bucketSpan;
}
@Override
public int hashCode() {
return Objects.hash(jobId, forecastId, timestamp, partitionFieldName, partitionFieldValue,
overFieldName, overFieldValue, byFieldName, byFieldValue,
modelFeature, forecastLower, forecastUpper, forecastPrediction, bucketSpan);
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
import java.util.Objects;
/**
* Model ForecastStats POJO.
*
* Note forecast stats are sent from the autodetect process but do not get
* indexed.
*/
public class ForecastStats implements ToXContentObject, Writeable {
/**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "model_forecast_stats";
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
public static final ParseField RECORD_COUNT = new ParseField("forecast_record_count");
public static final ConstructingObjectParser<ForecastStats, Void> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastStats((String) a[0], (long) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareString((modelForecastStats, s) -> {}, Result.RESULT_TYPE);
PARSER.declareLong(ForecastStats::setRecordCount, RECORD_COUNT);
}
private final String jobId;
private final long forecastId;
private long recordCount;
public ForecastStats(String jobId, long forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}
public ForecastStats(StreamInput in) throws IOException {
jobId = in.readString();
forecastId = in.readLong();
recordCount = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(forecastId);
out.writeLong(recordCount);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.field(FORECAST_ID.getPreferredName(), forecastId);
builder.field(RECORD_COUNT.getPreferredName(), recordCount);
builder.endObject();
return builder;
}
public String getJobId() {
return jobId;
}
public String getId() {
return jobId + "_model_forecast_stats";
}
public void setRecordCount(long recordCount) {
this.recordCount = recordCount;
}
public double getRecordCount() {
return recordCount;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof ForecastStats == false) {
return false;
}
ForecastStats that = (ForecastStats) other;
return Objects.equals(this.jobId, that.jobId) &&
this.forecastId == that.forecastId &&
this.recordCount == that.recordCount;
}
@Override
public int hashCode() {
return Objects.hash(jobId, forecastId, recordCount);
}
}

View File

@ -127,6 +127,10 @@ public final class ReservedFieldNames {
ModelPlot.MODEL_UPPER.getPreferredName(), ModelPlot.MODEL_MEDIAN.getPreferredName(),
ModelPlot.ACTUAL.getPreferredName(),
Forecast.FORECAST_LOWER.getPreferredName(), Forecast.FORECAST_UPPER.getPreferredName(),
Forecast.FORECAST_PREDICTION.getPreferredName(),
Forecast.FORECAST_ID.getPreferredName(),
ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName(),
ModelSizeStats.TOTAL_BY_FIELD_COUNT_FIELD.getPreferredName(),
ModelSizeStats.TOTAL_OVER_FIELD_COUNT_FIELD.getPreferredName(),

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
public class RestForecastJobAction extends BaseRestHandler {
public RestForecastJobAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST,
MachineLearning.BASE_PATH + "anomaly_detectors/{" + Job.ID.getPreferredName() + "}/_forecast", this);
}
@Override
public String getName() {
return "xpack_ml_forecast_job_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
final ForecastJobAction.Request request;
if (restRequest.hasContentOrSourceParam()) {
XContentParser parser = restRequest.contentOrSourceParamParser();
request = ForecastJobAction.Request.parseRequest(jobId, parser);
} else {
request = new ForecastJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
if (restRequest.hasParam(ForecastJobAction.Request.END_TIME.getPreferredName())) {
request.setEndTime(restRequest.param(ForecastJobAction.Request.END_TIME.getPreferredName()));
}
}
return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request;
public class ForecastJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
@Override
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(null, parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
return request;
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Response;
public class ForecastJobActionResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean(), randomNonNegativeLong());
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -357,47 +357,47 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
private List<AutodetectResult> results = new ArrayList<>();
ResultsBuilder addBucket(Bucket bucket) {
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null));
results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addRecords(List<AnomalyRecord> records) {
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null));
results.add(new AutodetectResult(null, records, null, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addInfluencers(List<Influencer> influencers) {
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null));
results.add(new AutodetectResult(null, null, influencers, null, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) {
results.add(new AutodetectResult(null, null, null, null, null, null, null, categoryDefinition, null));
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, categoryDefinition, null));
return this;
}
ResultsBuilder addmodelPlot(ModelPlot modelPlot) {
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null));
results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null));
return this;
}
ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) {
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null));
results.add(new AutodetectResult(null, null, null, null, null, modelSizeStats, null, null, null, null, null));
return this;
}
ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) {
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null));
results.add(new AutodetectResult(null, null, null, null, modelSnapshot, null, null, null, null, null, null));
return this;
}
ResultsBuilder addQuantiles(Quantiles quantiles) {
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null));
results.add(new AutodetectResult(null, null, null, quantiles, null, null, null, null, null, null, null));
return this;
}
ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) {
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement));
results.add(new AutodetectResult(null, null, null, null, null, null, null, null, null, null, flushAcknowledgement));
return this;
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ForecastParamsTests extends ESTestCase {
private static ParseField END = new ParseField("end");
public void testDefault_GivesTomorrowTimeInSeconds() {
long nowSecs = System.currentTimeMillis() / 1000;
nowSecs += 60 * 60 * 24;
ForecastParams params = ForecastParams.builder().build();
assertThat(params.getEndTime(), greaterThanOrEqualTo(nowSecs));
assertThat(params.getEndTime(), lessThanOrEqualTo(nowSecs +1));
}
public void test_UnparseableEndTimeThrows() {
ElasticsearchParseException e =
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> ForecastParams.builder().endTime("bad", END).build());
assertEquals(Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, "end", "bad"), e.getMessage());
}
public void testFormats() {
assertEquals(10L, ForecastParams.builder().endTime("10000", END).build().getEndTime());
assertEquals(1462096800L, ForecastParams.builder().endTime("2016-05-01T10:00:00Z", END).build().getEndTime());
long nowSecs = System.currentTimeMillis() / 1000;
long end = ForecastParams.builder().endTime("now+2H", END).build().getEndTime();
assertThat(end, greaterThanOrEqualTo(nowSecs + 7200));
assertThat(end, lessThanOrEqualTo(nowSecs + 7200 +1));
}
}

View File

@ -35,6 +35,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
ModelSnapshot modelSnapshot;
ModelSizeStats.Builder modelSizeStats;
ModelPlot modelPlot;
Forecast forecast;
ForecastStats forecastStats;
CategoryDefinition categoryDefinition;
FlushAcknowledgement flushAcknowledgement;
String jobId = "foo";
@ -84,6 +86,16 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
} else {
modelPlot = null;
}
if (randomBoolean()) {
forecast = new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()), randomNonNegativeLong());
} else {
forecast = null;
}
if (randomBoolean()) {
forecastStats = new ForecastStats(jobId, randomNonNegativeLong());
} else {
forecastStats = null;
}
if (randomBoolean()) {
categoryDefinition = new CategoryDefinition(jobId);
categoryDefinition.setCategoryId(randomLong());
@ -96,7 +108,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
flushAcknowledgement = null;
}
return new AutodetectResult(bucket, records, influencers, quantiles, modelSnapshot,
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, categoryDefinition, flushAcknowledgement);
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, forecast, forecastStats, categoryDefinition,
flushAcknowledgement);
}
@Override

View File

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class ForecastStatsTests extends AbstractSerializingTestCase<ForecastStats> {
@Override
protected ForecastStats parseInstance(XContentParser parser) {
return ForecastStats.PARSER.apply(parser, null);
}
@Override
protected ForecastStats createTestInstance() {
return createTestInstance("ForecastStatsTest", randomNonNegativeLong());
}
public ForecastStats createTestInstance(String jobId, long forecastId) {
ForecastStats forecastStats = new ForecastStats(jobId, forecastId);
if (randomBoolean()) {
forecastStats.setRecordCount(randomLong());
}
return forecastStats;
}
@Override
protected Reader<ForecastStats> instanceReader() {
return ForecastStats::new;
}
@Override
protected ForecastStats doParseInstance(XContentParser parser) throws IOException {
return ForecastStats.PARSER.apply(parser, null);
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Date;
public class ForecastTests extends AbstractSerializingTestCase<Forecast> {
@Override
protected Forecast parseInstance(XContentParser parser) {
return Forecast.PARSER.apply(parser, null);
}
@Override
protected Forecast createTestInstance() {
return createTestInstance("ForecastTest");
}
public Forecast createTestInstance(String jobId) {
Forecast forecast = new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()), randomNonNegativeLong());
if (randomBoolean()) {
forecast.setByFieldName(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
forecast.setByFieldValue(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
forecast.setPartitionFieldName(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
forecast.setPartitionFieldValue(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
forecast.setModelFeature(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
forecast.setForecastLower(randomDouble());
}
if (randomBoolean()) {
forecast.setForecastUpper(randomDouble());
}
if (randomBoolean()) {
forecast.setForecastPrediction(randomDouble());
}
return forecast;
}
@Override
protected Reader<Forecast> instanceReader() {
return Forecast::new;
}
@Override
protected Forecast doParseInstance(XContentParser parser) throws IOException {
return Forecast.PARSER.apply(parser, null);
}
}

View File

@ -157,3 +157,4 @@ indices:data/write/update/byquery
indices:data/write/delete/byquery
indices:data/write/reindex
cluster:admin/xpack/deprecation/info
cluster:admin/xpack/ml/job/forecast

View File

@ -0,0 +1,24 @@
{
"xpack.ml.forecast": {
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ml/anomaly_detectors/{job_id}/_forecast",
"paths": [ "/_xpack/ml/anomaly_detectors/{job_id}/_forecast" ],
"parts": {
"job_id": {
"type": "string",
"required": true,
"description": "The ID of the job to forecast for"
}
},
"params": {
"end": {
"type": "string",
"required": false,
"description": "The end time of the forecast"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,41 @@
setup:
- do:
xpack.ml.put_job:
job_id: forecast-job
body: >
{
"description":"A forecast job",
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent"
}
}
---
"Test forecast unknown job":
- do:
catch: missing
xpack.ml.forecast:
job_id: "non-existing-job"
---
"Test forecast on closed job":
- do:
catch: /status_exception/
xpack.ml.forecast:
job_id: "forecast-job"
---
"Test bad end param errors":
- do:
xpack.ml.open_job:
job_id: "forecast-job"
- do:
catch: /parse_exception/
xpack.ml.forecast:
job_id: "forecast-job"
end: "tomorrow"