[ML-FC] Forecast status message rebase (elastic/x-pack-elasticsearch#2936)
Re-enables persistence of the forecast request stats document and adds more fields to it. Depends on https://github.com/elastic/machine-learning-cpp/pull/382. Original commit: elastic/x-pack-elasticsearch@b6762005c0
This commit is contained in:
parent
244cfa7181
commit
fd392627a0
|
@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
|
|||
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
|
||||
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
|
||||
import org.elasticsearch.xpack.ml.job.results.Forecast;
|
||||
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.Influence;
|
||||
import org.elasticsearch.xpack.ml.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||
|
@ -337,8 +338,25 @@ public class ElasticsearchMappings {
|
|||
.startObject(Forecast.FORECAST_ID.getPreferredName())
|
||||
.field(TYPE, LONG)
|
||||
.endObject();
|
||||
}
|
||||
|
||||
// Forecast Stats Output
|
||||
// re-used: PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
|
||||
builder.startObject(ForecastRequestStats.START_TIME.getPreferredName())
|
||||
.field(TYPE, DATE)
|
||||
.endObject()
|
||||
.startObject(ForecastRequestStats.END_TIME.getPreferredName())
|
||||
.field(TYPE, DATE)
|
||||
.endObject()
|
||||
.startObject(ForecastRequestStats.MESSAGE.getPreferredName())
|
||||
.field(TYPE, KEYWORD)
|
||||
.endObject()
|
||||
.startObject(ForecastRequestStats.PROGRESS.getPreferredName())
|
||||
.field(TYPE, DOUBLE)
|
||||
.endObject()
|
||||
.startObject(ForecastRequestStats.STATUS.getPreferredName())
|
||||
.field(TYPE, KEYWORD)
|
||||
.endObject();
|
||||
}
|
||||
|
||||
/**
|
||||
* AnomalyRecord fields to be added under the 'properties' section of the mapping
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
|
|||
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
|
||||
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
|
||||
import org.elasticsearch.xpack.ml.job.results.Forecast;
|
||||
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||
|
||||
|
@ -158,6 +159,13 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
|
||||
logger.trace("[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]", jobId, indexName,
|
||||
forecastRequestStats.getId());
|
||||
indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE);
|
||||
return this;
|
||||
}
|
||||
|
||||
private void indexResult(String id, ToXContent resultDoc, String resultType) {
|
||||
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
|
||||
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
|
|||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
|
||||
import org.elasticsearch.xpack.ml.job.results.Forecast;
|
||||
import org.elasticsearch.xpack.ml.job.results.ForecastStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
|
||||
import org.elasticsearch.xpack.ml.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||
|
||||
|
@ -195,16 +195,21 @@ public class AutoDetectResultProcessor {
|
|||
if (forecast != null) {
|
||||
context.bulkResultsPersister.persistForecast(forecast);
|
||||
}
|
||||
ForecastStats forecastStats = result.getForecastStats();
|
||||
if (forecastStats != null) {
|
||||
// forecast stats are send by autodetect but do not get persisted,
|
||||
// still they mark the end of a forecast
|
||||
ForecastRequestStats forecastRequestStats = result.getForecastRequestStats();
|
||||
if (forecastRequestStats != null) {
|
||||
LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
|
||||
context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
|
||||
|
||||
LOGGER.trace("Received Forecast Stats [{}]", forecastStats.getId());
|
||||
double forecastProgress = forecastRequestStats.getProgress();
|
||||
|
||||
// forecast stats mark the end of a forecast, therefore commit whatever we have
|
||||
// persist if progress is 0 (probably some error condition) or 1 (finished),
|
||||
// otherwise rely on the count-based trigger
|
||||
if (forecastProgress == 0.0 || forecastProgress >= 1.0) {
|
||||
// if forecast stats progress is 1.0 it marks the end of a forecast,
|
||||
// therefore commit whatever we have
|
||||
context.bulkResultsPersister.executeRequest();
|
||||
}
|
||||
}
|
||||
ModelSizeStats modelSizeStats = result.getModelSizeStats();
|
||||
if (modelSizeStats != null) {
|
||||
LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
|
||||
|
|
|
@ -31,8 +31,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
public static final ConstructingObjectParser<AutodetectResult, Void> PARSER = new ConstructingObjectParser<>(
|
||||
TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List<AnomalyRecord>) a[1], (List<Influencer>) a[2],
|
||||
(Quantiles) a[3], a[4] == null ? null : ((ModelSnapshot.Builder) a[4]).build(),
|
||||
a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(),
|
||||
(ModelPlot) a[6], (Forecast) a[7], (ForecastStats) a[8], (CategoryDefinition) a[9], (FlushAcknowledgement) a[10]));
|
||||
a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(), (ModelPlot) a[6],
|
||||
(Forecast) a[7], (ForecastRequestStats) a[8], (CategoryDefinition) a[9], (FlushAcknowledgement) a[10]));
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.RESULT_TYPE_FIELD);
|
||||
|
@ -44,7 +44,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
ModelSizeStats.RESULT_TYPE_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelPlot.PARSER, ModelPlot.RESULTS_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Forecast.PARSER, Forecast.RESULTS_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastStats.PARSER, ForecastStats.RESULTS_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastRequestStats.PARSER,
|
||||
ForecastRequestStats.RESULTS_FIELD);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), CategoryDefinition.PARSER, CategoryDefinition.TYPE);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), FlushAcknowledgement.PARSER, FlushAcknowledgement.TYPE);
|
||||
}
|
||||
|
@ -57,13 +58,13 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
private final ModelSizeStats modelSizeStats;
|
||||
private final ModelPlot modelPlot;
|
||||
private final Forecast forecast;
|
||||
private final ForecastStats forecastStats;
|
||||
private final ForecastRequestStats forecastRequestStats;
|
||||
private final CategoryDefinition categoryDefinition;
|
||||
private final FlushAcknowledgement flushAcknowledgement;
|
||||
|
||||
public AutodetectResult(Bucket bucket, List<AnomalyRecord> records, List<Influencer> influencers, Quantiles quantiles,
|
||||
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelPlot modelPlot, Forecast forecast,
|
||||
ForecastStats forecastStats, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
|
||||
ForecastRequestStats forecastRequestStats, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
|
||||
this.bucket = bucket;
|
||||
this.records = records;
|
||||
this.influencers = influencers;
|
||||
|
@ -72,7 +73,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
this.modelSizeStats = modelSizeStats;
|
||||
this.modelPlot = modelPlot;
|
||||
this.forecast = forecast;
|
||||
this.forecastStats = forecastStats;
|
||||
this.forecastRequestStats = forecastRequestStats;
|
||||
this.categoryDefinition = categoryDefinition;
|
||||
this.flushAcknowledgement = flushAcknowledgement;
|
||||
}
|
||||
|
@ -131,13 +132,13 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
this.forecast = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.forecastStats = new ForecastStats(in);
|
||||
this.forecastRequestStats = new ForecastRequestStats(in);
|
||||
} else {
|
||||
this.forecastStats = null;
|
||||
this.forecastRequestStats = null;
|
||||
}
|
||||
} else {
|
||||
this.forecast = null;
|
||||
this.forecastStats = null;
|
||||
this.forecastRequestStats = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,7 +156,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
|
||||
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
|
||||
writeNullable(forecast, out);
|
||||
writeNullable(forecastStats, out);
|
||||
writeNullable(forecastRequestStats, out);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,7 +187,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder);
|
||||
addNullableField(ModelPlot.RESULTS_FIELD, modelPlot, builder);
|
||||
addNullableField(Forecast.RESULTS_FIELD, forecast, builder);
|
||||
addNullableField(ForecastStats.RESULTS_FIELD, forecastStats, builder);
|
||||
addNullableField(ForecastRequestStats.RESULTS_FIELD, forecastRequestStats, builder);
|
||||
addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder);
|
||||
addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder);
|
||||
builder.endObject();
|
||||
|
@ -237,8 +238,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
return forecast;
|
||||
}
|
||||
|
||||
public ForecastStats getForecastStats() {
|
||||
return forecastStats;
|
||||
public ForecastRequestStats getForecastRequestStats() {
|
||||
return forecastRequestStats;
|
||||
}
|
||||
|
||||
public CategoryDefinition getCategoryDefinition() {
|
||||
|
@ -251,8 +252,8 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, forecast, forecastStats,
|
||||
modelSizeStats, modelSnapshot, quantiles);
|
||||
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelPlot, forecast,
|
||||
forecastRequestStats, modelSizeStats, modelSnapshot, quantiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -271,7 +272,7 @@ public class AutodetectResult implements ToXContentObject, Writeable {
|
|||
Objects.equals(flushAcknowledgement, other.flushAcknowledgement) &&
|
||||
Objects.equals(modelPlot, other.modelPlot) &&
|
||||
Objects.equals(forecast, other.forecast) &&
|
||||
Objects.equals(forecastStats, other.forecastStats) &&
|
||||
Objects.equals(forecastRequestStats, other.forecastRequestStats) &&
|
||||
Objects.equals(modelSizeStats, other.modelSizeStats) &&
|
||||
Objects.equals(modelSnapshot, other.modelSnapshot) &&
|
||||
Objects.equals(quantiles, other.quantiles);
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.results;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Model ForecastRequestStats POJO.
|
||||
*
|
||||
* This information is produced by the autodetect process and contains
|
||||
* information about errors, progress and counters. There is exactly 1 document
|
||||
* per forecast request, getting updated while the request is processed.
|
||||
*/
|
||||
public class ForecastRequestStats implements ToXContentObject, Writeable {
|
||||
/**
|
||||
* Result type
|
||||
*/
|
||||
public static final String RESULT_TYPE_VALUE = "model_forecast_request_stats";
|
||||
|
||||
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
|
||||
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
|
||||
public static final ParseField START_TIME = new ParseField("forecast_start_timestamp");
|
||||
public static final ParseField END_TIME = new ParseField("forecast_end_timestamp");
|
||||
public static final ParseField MESSAGE = new ParseField("forecast_message");
|
||||
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
|
||||
public static final ParseField PROGRESS = new ParseField("forecast_progress");
|
||||
public static final ParseField PROCESSED_RECORD_COUNT = new ParseField("processed_record_count");
|
||||
public static final ParseField STATUS = new ParseField("forecast_status");
|
||||
|
||||
public static final ConstructingObjectParser<ForecastRequestStats, Void> PARSER =
|
||||
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastRequestStats((String) a[0], (long) a[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
|
||||
|
||||
PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE);
|
||||
PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT);
|
||||
PARSER.declareString(ForecastRequestStats::setMessage, MESSAGE);
|
||||
PARSER.declareField(ForecastRequestStats::setStartTimeStamp,
|
||||
p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG);
|
||||
PARSER.declareField(ForecastRequestStats::setEndTimeStamp,
|
||||
p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG);
|
||||
PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS);
|
||||
PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS);
|
||||
PARSER.declareField(ForecastRequestStats::setStatus, p -> ForecastRequestStatus.fromString(p.text()), STATUS, ValueType.STRING);
|
||||
}
|
||||
|
||||
public enum ForecastRequestStatus implements Writeable {
|
||||
OK, FAILED, STOPPED, STARTED, FINISHED, SCHEDULED;
|
||||
|
||||
public static ForecastRequestStatus fromString(String statusName) {
|
||||
return valueOf(statusName.trim().toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
public static ForecastRequestStatus readFromStream(StreamInput in) throws IOException {
|
||||
return in.readEnum(ForecastRequestStatus.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeEnum(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private final long forecastId;
|
||||
private long recordCount;
|
||||
private String message;
|
||||
private Instant dateStarted = Instant.EPOCH;
|
||||
private Instant dateEnded = Instant.EPOCH;
|
||||
private double progress;
|
||||
private long processingTime;
|
||||
private ForecastRequestStatus status = ForecastRequestStatus.OK;
|
||||
|
||||
public ForecastRequestStats(String jobId, long forecastId) {
|
||||
this.jobId = jobId;
|
||||
this.forecastId = forecastId;
|
||||
}
|
||||
|
||||
public ForecastRequestStats(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
forecastId = in.readLong();
|
||||
recordCount = in.readLong();
|
||||
message = in.readOptionalString();
|
||||
dateStarted = Instant.ofEpochMilli(in.readVLong());
|
||||
dateEnded = Instant.ofEpochMilli(in.readVLong());
|
||||
progress = in.readDouble();
|
||||
processingTime = in.readLong();
|
||||
status = ForecastRequestStatus.readFromStream(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
out.writeLong(forecastId);
|
||||
out.writeLong(recordCount);
|
||||
out.writeOptionalString(message);
|
||||
out.writeVLong(dateStarted.toEpochMilli());
|
||||
out.writeVLong(dateEnded.toEpochMilli());
|
||||
out.writeDouble(progress);
|
||||
out.writeLong(processingTime);
|
||||
status.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
|
||||
builder.field(FORECAST_ID.getPreferredName(), forecastId);
|
||||
builder.field(PROCESSED_RECORD_COUNT.getPreferredName(), recordCount);
|
||||
if (message != null) {
|
||||
builder.field(MESSAGE.getPreferredName(), message);
|
||||
}
|
||||
if (dateStarted.equals(Instant.EPOCH) == false) {
|
||||
builder.field(START_TIME.getPreferredName(), dateStarted.toEpochMilli());
|
||||
}
|
||||
if (dateEnded.equals(Instant.EPOCH) == false) {
|
||||
builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli());
|
||||
}
|
||||
builder.field(PROGRESS.getPreferredName(), progress);
|
||||
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime);
|
||||
builder.field(STATUS.getPreferredName(), status);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the document ID used for indexing. As there is 1 and only 1 document
|
||||
* per forecast request, the id has no dynamic parts.
|
||||
*
|
||||
* @return id
|
||||
*/
|
||||
public String getId() {
|
||||
return jobId + "_model_forecast_request_stats_" + forecastId;
|
||||
}
|
||||
|
||||
public void setRecordCount(long recordCount) {
|
||||
this.recordCount = recordCount;
|
||||
}
|
||||
|
||||
public double getRecordCount() {
|
||||
return recordCount;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public Instant getDateStarted() {
|
||||
return dateStarted;
|
||||
}
|
||||
|
||||
public void setStartTimeStamp(Instant dateStarted) {
|
||||
this.dateStarted = dateStarted;
|
||||
}
|
||||
|
||||
public Instant getDateEnded() {
|
||||
return dateEnded;
|
||||
}
|
||||
|
||||
public void setEndTimeStamp(Instant dateEnded) {
|
||||
this.dateEnded = dateEnded;
|
||||
}
|
||||
|
||||
/**
|
||||
* Progress information of the ForecastRequest in the range 0 to 1,
|
||||
* while 1 means finished
|
||||
*
|
||||
* @return progress value
|
||||
*/
|
||||
public double getProgress() {
|
||||
return progress;
|
||||
}
|
||||
|
||||
public void setProgress(double progress) {
|
||||
this.progress = progress;
|
||||
}
|
||||
|
||||
public long getProcessingTime() {
|
||||
return processingTime;
|
||||
}
|
||||
|
||||
public void setProcessingTime(long processingTime) {
|
||||
this.processingTime = processingTime;
|
||||
}
|
||||
|
||||
public ForecastRequestStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(ForecastRequestStatus jobStatus) {
|
||||
Objects.requireNonNull(jobStatus, "[" + STATUS.getPreferredName() + "] must not be null");
|
||||
this.status = jobStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
if (other instanceof ForecastRequestStats == false) {
|
||||
return false;
|
||||
}
|
||||
ForecastRequestStats that = (ForecastRequestStats) other;
|
||||
return Objects.equals(this.jobId, that.jobId) &&
|
||||
this.forecastId == that.forecastId &&
|
||||
this.recordCount == that.recordCount &&
|
||||
Objects.equals(this.message, that.message) &&
|
||||
Objects.equals(this.dateStarted, that.dateStarted) &&
|
||||
Objects.equals(this.dateEnded, that.dateEnded) &&
|
||||
this.progress == that.progress &&
|
||||
this.processingTime == that.processingTime &&
|
||||
Objects.equals(this.status, that.status);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, forecastId, recordCount, message, dateStarted, dateEnded, progress,
|
||||
processingTime, status);
|
||||
}
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.results;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Model ForecastStats POJO.
|
||||
*
|
||||
* Note forecast stats are sent from the autodetect process but do not get
|
||||
* indexed.
|
||||
*/
|
||||
public class ForecastStats implements ToXContentObject, Writeable {
|
||||
/**
|
||||
* Result type
|
||||
*/
|
||||
public static final String RESULT_TYPE_VALUE = "model_forecast_stats";
|
||||
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
|
||||
|
||||
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
|
||||
public static final ParseField RECORD_COUNT = new ParseField("forecast_record_count");
|
||||
|
||||
public static final ConstructingObjectParser<ForecastStats, Void> PARSER =
|
||||
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastStats((String) a[0], (long) a[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
|
||||
|
||||
PARSER.declareString((modelForecastStats, s) -> {}, Result.RESULT_TYPE);
|
||||
PARSER.declareLong(ForecastStats::setRecordCount, RECORD_COUNT);
|
||||
}
|
||||
|
||||
private final String jobId;
|
||||
private final long forecastId;
|
||||
private long recordCount;
|
||||
|
||||
public ForecastStats(String jobId, long forecastId) {
|
||||
this.jobId = jobId;
|
||||
this.forecastId = forecastId;
|
||||
}
|
||||
|
||||
public ForecastStats(StreamInput in) throws IOException {
|
||||
jobId = in.readString();
|
||||
forecastId = in.readLong();
|
||||
recordCount = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(jobId);
|
||||
out.writeLong(forecastId);
|
||||
out.writeLong(recordCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
|
||||
builder.field(FORECAST_ID.getPreferredName(), forecastId);
|
||||
builder.field(RECORD_COUNT.getPreferredName(), recordCount);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return jobId + "_model_forecast_stats";
|
||||
}
|
||||
|
||||
public void setRecordCount(long recordCount) {
|
||||
this.recordCount = recordCount;
|
||||
}
|
||||
|
||||
public double getRecordCount() {
|
||||
return recordCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
if (other instanceof ForecastStats == false) {
|
||||
return false;
|
||||
}
|
||||
ForecastStats that = (ForecastStats) other;
|
||||
return Objects.equals(this.jobId, that.jobId) &&
|
||||
this.forecastId == that.forecastId &&
|
||||
this.recordCount == that.recordCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(jobId, forecastId, recordCount);
|
||||
}
|
||||
}
|
|
@ -131,6 +131,12 @@ public final class ReservedFieldNames {
|
|||
Forecast.FORECAST_PREDICTION.getPreferredName(),
|
||||
Forecast.FORECAST_ID.getPreferredName(),
|
||||
|
||||
ForecastRequestStats.START_TIME.getPreferredName(),
|
||||
ForecastRequestStats.END_TIME.getPreferredName(),
|
||||
ForecastRequestStats.MESSAGE.getPreferredName(),
|
||||
ForecastRequestStats.PROGRESS.getPreferredName(),
|
||||
ForecastRequestStats.STATUS.getPreferredName(),
|
||||
|
||||
ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName(),
|
||||
ModelSizeStats.TOTAL_BY_FIELD_COUNT_FIELD.getPreferredName(),
|
||||
ModelSizeStats.TOTAL_OVER_FIELD_COUNT_FIELD.getPreferredName(),
|
||||
|
|
|
@ -36,7 +36,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
ModelSizeStats.Builder modelSizeStats;
|
||||
ModelPlot modelPlot;
|
||||
Forecast forecast;
|
||||
ForecastStats forecastStats;
|
||||
ForecastRequestStats forecastRequestStats;
|
||||
CategoryDefinition categoryDefinition;
|
||||
FlushAcknowledgement flushAcknowledgement;
|
||||
String jobId = "foo";
|
||||
|
@ -92,9 +92,9 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
forecast = null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastStats = new ForecastStats(jobId, randomNonNegativeLong());
|
||||
forecastRequestStats = new ForecastRequestStats(jobId, randomNonNegativeLong());
|
||||
} else {
|
||||
forecastStats = null;
|
||||
forecastRequestStats = null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
categoryDefinition = new CategoryDefinition(jobId);
|
||||
|
@ -108,7 +108,7 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
flushAcknowledgement = null;
|
||||
}
|
||||
return new AutodetectResult(bucket, records, influencers, quantiles, modelSnapshot,
|
||||
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, forecast, forecastStats, categoryDefinition,
|
||||
modelSizeStats == null ? null : modelSizeStats.build(), modelPlot, forecast, forecastRequestStats, categoryDefinition,
|
||||
flushAcknowledgement);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.results;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastRequestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
|
||||
public class ForecastRequestStatsTests extends AbstractSerializingTestCase<ForecastRequestStats> {
|
||||
|
||||
@Override
|
||||
protected ForecastRequestStats parseInstance(XContentParser parser) {
|
||||
return ForecastRequestStats.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ForecastRequestStats createTestInstance() {
|
||||
return createTestInstance("ForecastRequestStatsTest", randomNonNegativeLong());
|
||||
}
|
||||
|
||||
public ForecastRequestStats createTestInstance(String jobId, long forecastId) {
|
||||
ForecastRequestStats forecastRequestStats = new ForecastRequestStats(jobId, forecastId);
|
||||
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setRecordCount(randomLong());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setMessage(randomAlphaOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setStartTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setEndTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setProgress(randomDouble());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setProcessingTime(randomNonNegativeLong());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
forecastRequestStats.setStatus(randomFrom(ForecastRequestStatus.values()));
|
||||
}
|
||||
|
||||
return forecastRequestStats;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<ForecastRequestStats> instanceReader() {
|
||||
return ForecastRequestStats::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ForecastRequestStats doParseInstance(XContentParser parser) throws IOException {
|
||||
return ForecastRequestStats.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.results;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ForecastStatsTests extends AbstractSerializingTestCase<ForecastStats> {
|
||||
|
||||
@Override
|
||||
protected ForecastStats parseInstance(XContentParser parser) {
|
||||
return ForecastStats.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ForecastStats createTestInstance() {
|
||||
return createTestInstance("ForecastStatsTest", randomNonNegativeLong());
|
||||
}
|
||||
|
||||
public ForecastStats createTestInstance(String jobId, long forecastId) {
|
||||
ForecastStats forecastStats = new ForecastStats(jobId, forecastId);
|
||||
|
||||
if (randomBoolean()) {
|
||||
forecastStats.setRecordCount(randomLong());
|
||||
}
|
||||
|
||||
return forecastStats;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<ForecastStats> instanceReader() {
|
||||
return ForecastStats::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ForecastStats doParseInstance(XContentParser parser) throws IOException {
|
||||
return ForecastStats.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue