[ML] Change forecast_id to UUid, add create_time and start_time (elastic/x-pack-elasticsearch#3095)

relates elastic/x-pack-elasticsearch#3093

Original commit: elastic/x-pack-elasticsearch@f586189851
This commit is contained in:
Dimitris Athanasiou 2017-11-23 14:46:52 +00:00 committed by GitHub
parent a8b5b138a7
commit e0affd455d
16 changed files with 358 additions and 120 deletions

View File

@ -175,13 +175,13 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
private long forecastId;
private String forecastId;
Response() {
super(null, null);
}
Response(boolean acknowledged, long forecastId) {
Response(boolean acknowledged, String forecastId) {
super(null, null);
this.acknowledged = acknowledged;
this.forecastId = forecastId;
@ -191,7 +191,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
return acknowledged;
}
public long getForecastId() {
public String getForecastId() {
return forecastId;
}
@ -199,14 +199,14 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
forecastId = in.readLong();
forecastId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
out.writeLong(forecastId);
out.writeString(forecastId);
}
@Override
@ -227,7 +227,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
return false;
}
Response other = (Response) obj;
return this.acknowledged == other.acknowledged && this.forecastId == other.forecastId;
return this.acknowledged == other.acknowledged && Objects.equals(this.forecastId, other.forecastId);
}
@Override

View File

@ -336,12 +336,18 @@ public class ElasticsearchMappings {
.field(TYPE, DOUBLE)
.endObject()
.startObject(Forecast.FORECAST_ID.getPreferredName())
.field(TYPE, LONG)
.field(TYPE, KEYWORD)
.endObject();
// Forecast Stats Output
// re-used: TIMESTAMP, PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
builder.startObject(ForecastRequestStats.END_TIME.getPreferredName())
builder.startObject(ForecastRequestStats.START_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.END_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.CREATE_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.EXPIRY_TIME.getPreferredName())
@ -355,6 +361,9 @@ public class ElasticsearchMappings {
.endObject()
.startObject(ForecastRequestStats.STATUS.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ForecastRequestStats.MEMORY_USAGE.getPreferredName())
.field(TYPE, LONG)
.endObject();
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.DateFieldMapper;
@ -16,18 +17,32 @@ import java.util.Objects;
public class ForecastParams {
private final String forecastId;
private final long createTime;
private final long endTime;
private final long duration;
private final long expiresIn;
private final long forecastId;
private ForecastParams(long forecastId, long endTime, long duration, long expiresIn) {
private ForecastParams(String forecastId, long createTime, long endTime, long duration, long expiresIn) {
this.forecastId = forecastId;
this.createTime = createTime;
this.endTime = endTime;
this.duration = duration;
this.expiresIn = expiresIn;
}
public String getForecastId() {
return forecastId;
}
/**
* The forecast create time in seconds from the epoch
* @return The create time in seconds from the epoch
*/
public long getCreateTime() {
return createTime;
}
/**
* The forecast end time in seconds from the epoch
* @return The end time in seconds from the epoch
@ -52,18 +67,9 @@ public class ForecastParams {
return expiresIn;
}
/**
* The forecast id
*
* @return The forecast Id
*/
public long getForecastId() {
return forecastId;
}
@Override
public int hashCode() {
return Objects.hash(forecastId, endTime, duration, expiresIn);
return Objects.hash(forecastId, createTime, endTime, duration, expiresIn);
}
@Override
@ -75,8 +81,11 @@ public class ForecastParams {
return false;
}
ForecastParams other = (ForecastParams) obj;
return Objects.equals(forecastId, other.forecastId) && Objects.equals(endTime, other.endTime) &&
Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn);
return Objects.equals(forecastId, other.forecastId)
&& Objects.equals(createTime, other.createTime)
&& Objects.equals(endTime, other.endTime)
&& Objects.equals(duration, other.duration)
&& Objects.equals(expiresIn, other.expiresIn);
}
public static Builder builder() {
@ -84,26 +93,22 @@ public class ForecastParams {
}
public static class Builder {
private final String forecastId;
private final long createTimeEpochSecs;
private long endTimeEpochSecs;
private long durationSecs;
private long expiresInSecs;
private long startTime;
private long forecastId;
private Builder() {
startTime = System.currentTimeMillis();
forecastId = UUIDs.base64UUID();
createTimeEpochSecs = System.currentTimeMillis() / 1000;
endTimeEpochSecs = 0;
forecastId = generateId();
durationSecs = 0;
// because 0 means never expire, the default is -1
expiresInSecs = -1;
}
private long generateId() {
return startTime;
}
public Builder endTime(String endTime, ParseField paramName) {
DateMathParser dateMathParser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER);
@ -132,7 +137,7 @@ public class ForecastParams {
throw new ElasticsearchParseException(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME));
}
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs, expiresInSecs);
return new ForecastParams(forecastId, createTimeEpochSecs, endTimeEpochSecs, durationSecs, expiresInSecs);
}
}
}

View File

@ -151,9 +151,9 @@ public class ControlMsgToProcessWriter {
}
public void writeForecastMessage(ForecastParams params) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("forecast_id", params.getForecastId());
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("forecast_id", params.getForecastId());
builder.field("create_time", params.getCreateTime());
if (params.getEndTime() != 0) {
builder.field("end_time", params.getEndTime());

View File

@ -45,11 +45,11 @@ public class Forecast implements ToXContentObject, Writeable {
public static final ConstructingObjectParser<Forecast, Void> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a ->
new Forecast((String) a[0], (long) a[1], (Date) a[2], (long) a[3], (int) a[4]));
new Forecast((String) a[0], (String) a[1], (Date) a[2], (long) a[3], (int) a[4]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue());
@ -73,7 +73,7 @@ public class Forecast implements ToXContentObject, Writeable {
}
private final String jobId;
private final long forecastId;
private final String forecastId;
private final Date timestamp;
private final long bucketSpan;
private int detectorIndex;
@ -86,9 +86,9 @@ public class Forecast implements ToXContentObject, Writeable {
private double forecastUpper;
private double forecastPrediction;
public Forecast(String jobId, long forecastId, Date timestamp, long bucketSpan, int detectorIndex) {
this.jobId = jobId;
this.forecastId = forecastId;
public Forecast(String jobId, String forecastId, Date timestamp, long bucketSpan, int detectorIndex) {
this.jobId = Objects.requireNonNull(jobId);
this.forecastId = Objects.requireNonNull(forecastId);
this.timestamp = timestamp;
this.bucketSpan = bucketSpan;
this.detectorIndex = detectorIndex;
@ -96,7 +96,7 @@ public class Forecast implements ToXContentObject, Writeable {
public Forecast(StreamInput in) throws IOException {
jobId = in.readString();
forecastId = in.readLong();
forecastId = in.readString();
timestamp = new Date(in.readLong());
partitionFieldName = in.readOptionalString();
partitionFieldValue = in.readOptionalString();
@ -113,7 +113,7 @@ public class Forecast implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(forecastId);
out.writeString(forecastId);
out.writeLong(timestamp.getTime());
out.writeOptionalString(partitionFieldName);
out.writeOptionalString(partitionFieldValue);
@ -165,7 +165,7 @@ public class Forecast implements ToXContentObject, Writeable {
return jobId;
}
public long getForecastId() {
public String getForecastId() {
return forecastId;
}
@ -264,7 +264,7 @@ public class Forecast implements ToXContentObject, Writeable {
}
Forecast that = (Forecast) other;
return Objects.equals(this.jobId, that.jobId) &&
forecastId == that.forecastId &&
Objects.equals(this.forecastId, that.forecastId) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.partitionFieldValue, that.partitionFieldValue) &&
Objects.equals(this.partitionFieldName, that.partitionFieldName) &&

View File

@ -10,9 +10,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.io.IOException;
@ -36,7 +36,9 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
public static final ParseField START_TIME = new ParseField("forecast_start_timestamp");
public static final ParseField END_TIME = new ParseField("forecast_end_timestamp");
public static final ParseField CREATE_TIME = new ParseField("forecast_create_timestamp");
public static final ParseField EXPIRY_TIME = new ParseField("forecast_expiry_timestamp");
public static final ParseField MESSAGES = new ParseField("forecast_messages");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
@ -46,20 +48,24 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
public static final ParseField MEMORY_USAGE = new ParseField("forecast_memory_bytes");
public static final ConstructingObjectParser<ForecastRequestStats, Void> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastRequestStats((String) a[0], (long) a[1]));
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastRequestStats((String) a[0], (String) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FORECAST_ID);
PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE);
PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT);
PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES);
PARSER.declareField(ForecastRequestStats::setTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), Result.TIMESTAMP, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setEndTimeStamp,
PARSER.declareField(ForecastRequestStats::setStartTime,
p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setEndTime,
p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setExpiryTimeStamp,
PARSER.declareField(ForecastRequestStats::setCreateTime,
p -> Instant.ofEpochMilli(p.longValue()), CREATE_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setExpiryTime,
p -> Instant.ofEpochMilli(p.longValue()), EXPIRY_TIME, ValueType.LONG);
PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS);
PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS);
@ -90,34 +96,39 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
}
private final String jobId;
private final long forecastId;
private final String forecastId;
private long recordCount;
private List<String> messages;
private Instant dateStarted = Instant.EPOCH;
private Instant dateEnded = Instant.EPOCH;
private Instant dateExpires = Instant.EPOCH;
private Instant timestamp = Instant.EPOCH;
private Instant startTime = Instant.EPOCH;
private Instant endTime = Instant.EPOCH;
private Instant createTime = Instant.EPOCH;
private Instant expiryTime = Instant.EPOCH;
private double progress;
private long processingTime;
private long memoryUsage;
private ForecastRequestStatus status = ForecastRequestStatus.OK;
public ForecastRequestStats(String jobId, long forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
public ForecastRequestStats(String jobId, String forecastId) {
this.jobId = Objects.requireNonNull(jobId);
this.forecastId = Objects.requireNonNull(forecastId);
}
public ForecastRequestStats(StreamInput in) throws IOException {
jobId = in.readString();
forecastId = in.readLong();
forecastId = in.readString();
recordCount = in.readLong();
if (in.readBoolean()) {
messages = in.readList(StreamInput::readString);
} else {
messages = null;
}
dateStarted = Instant.ofEpochMilli(in.readVLong());
dateEnded = Instant.ofEpochMilli(in.readVLong());
dateExpires = Instant.ofEpochMilli(in.readVLong());
timestamp = Instant.ofEpochMilli(in.readVLong());
startTime = Instant.ofEpochMilli(in.readVLong());
endTime = Instant.ofEpochMilli(in.readVLong());
createTime = Instant.ofEpochMilli(in.readVLong());
expiryTime = Instant.ofEpochMilli(in.readVLong());
progress = in.readDouble();
processingTime = in.readLong();
setMemoryUsage(in.readLong());
@ -127,7 +138,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(forecastId);
out.writeString(forecastId);
out.writeLong(recordCount);
if (messages != null) {
out.writeBoolean(true);
@ -135,9 +146,11 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
} else {
out.writeBoolean(false);
}
out.writeVLong(dateStarted.toEpochMilli());
out.writeVLong(dateEnded.toEpochMilli());
out.writeVLong(dateExpires.toEpochMilli());
out.writeVLong(timestamp.toEpochMilli());
out.writeVLong(startTime.toEpochMilli());
out.writeVLong(endTime.toEpochMilli());
out.writeVLong(createTime.toEpochMilli());
out.writeVLong(expiryTime.toEpochMilli());
out.writeDouble(progress);
out.writeLong(processingTime);
out.writeLong(getMemoryUsage());
@ -154,14 +167,20 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
if (messages != null) {
builder.field(MESSAGES.getPreferredName(), messages);
}
if (dateStarted.equals(Instant.EPOCH) == false) {
builder.field(Result.TIMESTAMP.getPreferredName(), dateStarted.toEpochMilli());
if (timestamp.equals(Instant.EPOCH) == false) {
builder.field(Result.TIMESTAMP.getPreferredName(), timestamp.toEpochMilli());
}
if (dateEnded.equals(Instant.EPOCH) == false) {
builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli());
if (startTime.equals(Instant.EPOCH) == false) {
builder.field(START_TIME.getPreferredName(), startTime.toEpochMilli());
}
if (dateExpires.equals(Instant.EPOCH) == false) {
builder.field(EXPIRY_TIME.getPreferredName(), dateExpires.toEpochMilli());
if (endTime.equals(Instant.EPOCH) == false) {
builder.field(END_TIME.getPreferredName(), endTime.toEpochMilli());
}
if (createTime.equals(Instant.EPOCH) == false) {
builder.field(CREATE_TIME.getPreferredName(), createTime.toEpochMilli());
}
if (expiryTime.equals(Instant.EPOCH) == false) {
builder.field(EXPIRY_TIME.getPreferredName(), expiryTime.toEpochMilli());
}
builder.field(PROGRESS.getPreferredName(), progress);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime);
@ -175,7 +194,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return jobId;
}
public long getForecastId() {
public String getForecastId() {
return forecastId;
}
@ -193,7 +212,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
this.recordCount = recordCount;
}
public double getRecordCount() {
public long getRecordCount() {
return recordCount;
}
@ -205,28 +224,44 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
this.messages = messages;
}
public Instant getDateStarted() {
return dateStarted;
public void setTimeStamp(Instant timestamp) {
this.timestamp = timestamp;
}
public void setTimeStamp(Instant dateStarted) {
this.dateStarted = dateStarted;
public Instant getTimestamp() {
return timestamp;
}
public Instant getDateEnded() {
return dateEnded;
public void setStartTime(Instant startTime) {
this.startTime = startTime;
}
public void setEndTimeStamp(Instant dateEnded) {
this.dateEnded = dateEnded;
public Instant getStartTime() {
return startTime;
}
public void setExpiryTimeStamp(Instant dateExpires) {
this.dateExpires = dateExpires;
public Instant getEndTime() {
return endTime;
}
public Instant getDateExpired() {
return dateExpires;
public void setEndTime(Instant endTime) {
this.endTime = endTime;
}
public void setCreateTime(Instant createTime) {
this.createTime = createTime;
}
public Instant getCreateTime() {
return createTime;
}
public void setExpiryTime(Instant expiryTime) {
this.expiryTime = expiryTime;
}
public Instant getExpiryTime() {
return expiryTime;
}
/**
@ -278,12 +313,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
}
ForecastRequestStats that = (ForecastRequestStats) other;
return Objects.equals(this.jobId, that.jobId) &&
this.forecastId == that.forecastId &&
Objects.equals(this.forecastId, that.forecastId) &&
this.recordCount == that.recordCount &&
Objects.equals(this.messages, that.messages) &&
Objects.equals(this.dateStarted, that.dateStarted) &&
Objects.equals(this.dateEnded, that.dateEnded) &&
Objects.equals(this.dateExpires, that.dateExpires) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.startTime, that.startTime) &&
Objects.equals(this.endTime, that.endTime) &&
Objects.equals(this.createTime, that.createTime) &&
Objects.equals(this.expiryTime, that.expiryTime) &&
this.progress == that.progress &&
this.processingTime == that.processingTime &&
this.memoryUsage == that.memoryUsage &&
@ -292,7 +329,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, dateExpires,
return Objects.hash(jobId, forecastId, recordCount, messages, timestamp, startTime, endTime, createTime, expiryTime,
progress, processingTime, memoryUsage, status);
}
}

View File

@ -131,12 +131,15 @@ public final class ReservedFieldNames {
Forecast.FORECAST_PREDICTION.getPreferredName(),
Forecast.FORECAST_ID.getPreferredName(),
//re-use: ForecastRequestStats.TIMESTAMP
//re-use: TIMESTAMP
ForecastRequestStats.START_TIME.getPreferredName(),
ForecastRequestStats.END_TIME.getPreferredName(),
ForecastRequestStats.CREATE_TIME.getPreferredName(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName(),
ForecastRequestStats.MESSAGES.getPreferredName(),
ForecastRequestStats.PROGRESS.getPreferredName(),
ForecastRequestStats.STATUS.getPreferredName(),
ForecastRequestStats.MEMORY_USAGE.getPreferredName(),
ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName(),
ModelSizeStats.TOTAL_BY_FIELD_COUNT_FIELD.getPreferredName(),

View File

@ -119,7 +119,7 @@ public class ExpiredForecastsRemover implements MlDataRemover {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, hit.getSourceRef());
ForecastRequestStats forecastRequestStats = ForecastRequestStats.PARSER.apply(parser, null);
if (forecastRequestStats.getDateExpired().toEpochMilli() < cutoffEpochMs) {
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
}
}

View File

@ -12,7 +12,7 @@ public class ForecastJobActionResponseTests extends AbstractStreamableTestCase<R
@Override
protected Response createTestInstance() {
return new Response(randomBoolean(), randomNonNegativeLong());
return new Response(randomBoolean(), randomAlphaOfLength(20));
}
@Override

View File

@ -90,7 +90,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
registerJob(newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null).setModelSnapshotRetentionDays(2L));
registerJob(newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(2L));
List<Long> shortExpiryForecastIds = new ArrayList<>();
List<String> shortExpiryForecastIds = new ArrayList<>();
long now = System.currentTimeMillis();
long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1;
@ -123,16 +123,13 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// Now let's create some forecasts
openJob(job.getId());
long forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1));
// We need to wait so that forecasts get different IDs
awaitBusy(() -> false, 5, TimeUnit.MILLISECONDS);
// We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment
// the DeleteExpiredDataAction is called.
String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1));
shortExpiryForecastIds.add(forecastShortExpiryId);
long forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null);
awaitBusy(() -> false, 5, TimeUnit.MILLISECONDS);
long forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO);
String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null);
String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO);
waitForecastToFinish(job.getId(), forecastShortExpiryId);
waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
waitForecastToFinish(job.getId(), forecastNoExpiryId);
@ -166,8 +163,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
List<ForecastRequestStats> forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(getJobs().size() * 3));
for (ForecastRequestStats forecastStat : forecastStats) {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()),
equalTo((long) forecastStat.getRecordCount()));
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
}
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
@ -213,11 +209,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(getJobs().size() * 2));
for (ForecastRequestStats forecastStat : forecastStats) {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()),
equalTo((long) forecastStat.getRecordCount()));
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
}
for (Job.Builder job : getJobs()) {
for (long forecastId : shortExpiryForecastIds) {
for (String forecastId : shortExpiryForecastIds) {
assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L));
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.junit.After;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
public class ForecastIT extends MlNativeAutodetectIntegTestCase {
@After
public void tearDownData() throws Exception {
cleanUp();
}
public void testSingleSeries() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");
TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder("forecast-it-test-single-series");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);
registerJob(job);
putJob(job);
openJob(job.getId());
long now = Instant.now().getEpochSecond();
long timestamp = now - 50 * bucketSpan.seconds();
List<String> data = new ArrayList<>();
while (timestamp < now) {
data.add(createJsonRecord(createRecord(timestamp, 10.0)));
data.add(createJsonRecord(createRecord(timestamp, 30.0)));
timestamp += bucketSpan.seconds();
}
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), false);
// Now we can start doing forecast requests
String forecastIdDefaultDurationDefaultExpiry = forecast(job.getId(), null, null);
String forecastIdDuration1HourNoExpiry = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO);
String forecastIdDuration3HoursExpiresIn24Hours = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueHours(24));
waitForecastToFinish(job.getId(), forecastIdDefaultDurationDefaultExpiry);
waitForecastToFinish(job.getId(), forecastIdDuration1HourNoExpiry);
waitForecastToFinish(job.getId(), forecastIdDuration3HoursExpiresIn24Hours);
closeJob(job.getId());
List<Bucket> buckets = getBuckets(job.getId());
Bucket lastBucket = buckets.get(buckets.size() - 1);
long lastBucketTime = lastBucket.getTimestamp().getTime();
// Now let's verify forecasts
double expectedForecastValue = 20.0;
List<ForecastRequestStats> forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(3));
Map<String, ForecastRequestStats> idToForecastStats = new HashMap<>();
forecastStats.stream().forEach(f -> idToForecastStats.put(f.getForecastId(), f));
{
ForecastRequestStats forecastDefaultDurationDefaultExpiry = idToForecastStats.get(forecastIdDefaultDurationDefaultExpiry);
assertThat(forecastDefaultDurationDefaultExpiry.getExpiryTime().toEpochMilli(),
equalTo(forecastDefaultDurationDefaultExpiry.getCreateTime().toEpochMilli()
+ TimeValue.timeValueHours(14 * 24).getMillis()));
List<Forecast> forecasts = getForecasts(job.getId(), forecastDefaultDurationDefaultExpiry);
assertThat(forecastDefaultDurationDefaultExpiry.getRecordCount(), equalTo(24L));
assertThat(forecasts.size(), equalTo(24));
assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime));
for (int i = 0; i < forecasts.size(); i++) {
Forecast forecast = forecasts.get(i);
assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis()));
assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds()));
assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01));
}
}
{
ForecastRequestStats forecastDuration1HourNoExpiry = idToForecastStats.get(forecastIdDuration1HourNoExpiry);
assertThat(forecastDuration1HourNoExpiry.getExpiryTime(), equalTo(Instant.EPOCH));
List<Forecast> forecasts = getForecasts(job.getId(), forecastDuration1HourNoExpiry);
assertThat(forecastDuration1HourNoExpiry.getRecordCount(), equalTo(1L));
assertThat(forecasts.size(), equalTo(1));
assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime));
for (int i = 0; i < forecasts.size(); i++) {
Forecast forecast = forecasts.get(i);
assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis()));
assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds()));
assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01));
}
}
{
ForecastRequestStats forecastDuration3HoursExpiresIn24Hours = idToForecastStats.get(forecastIdDuration3HoursExpiresIn24Hours);
assertThat(forecastDuration3HoursExpiresIn24Hours.getExpiryTime().toEpochMilli(),
equalTo(forecastDuration3HoursExpiresIn24Hours.getCreateTime().toEpochMilli()
+ TimeValue.timeValueHours(24).getMillis()));
List<Forecast> forecasts = getForecasts(job.getId(), forecastDuration3HoursExpiresIn24Hours);
assertThat(forecastDuration3HoursExpiresIn24Hours.getRecordCount(), equalTo(3L));
assertThat(forecasts.size(), equalTo(3));
assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime));
for (int i = 0; i < forecasts.size(); i++) {
Forecast forecast = forecasts.get(i);
assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis()));
assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds()));
assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01));
}
}
}
private static Map<String, Object> createRecord(long timestamp, double value) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
record.put("value", value);
return record;
}
}

View File

@ -26,6 +26,8 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
@ -291,16 +293,18 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
return client().execute(PostDataAction.INSTANCE, request).actionGet().getDataCounts();
}
protected long forecast(String jobId, TimeValue duration, TimeValue expiresIn) {
protected String forecast(String jobId, TimeValue duration, TimeValue expiresIn) {
ForecastJobAction.Request request = new ForecastJobAction.Request(jobId);
request.setDuration(duration.getStringRep());
if (duration != null) {
request.setDuration(duration.getStringRep());
}
if (expiresIn != null) {
request.setExpiresIn(expiresIn.getStringRep());
}
return client().execute(ForecastJobAction.INSTANCE, request).actionGet().getForecastId();
}
protected void waitForecastToFinish(String jobId, long forecastId) throws Exception {
protected void waitForecastToFinish(String jobId, String forecastId) throws Exception {
assertBusy(() -> {
ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId);
assertThat(forecastRequestStats, is(notNullValue()));
@ -308,7 +312,7 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
}, 30, TimeUnit.SECONDS);
}
protected ForecastRequestStats getForecastStats(String jobId, long forecastId) {
protected ForecastRequestStats getForecastStats(String jobId, String forecastId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
@ -350,7 +354,7 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
return forecastStats;
}
protected long countForecastDocs(String jobId, long forecastId) {
protected long countForecastDocs(String jobId, String forecastId) {
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE))
@ -360,6 +364,30 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
return searchResponse.getHits().getTotalHits();
}
protected List<Forecast> getForecasts(String jobId, ForecastRequestStats forecastRequestStats) {
List<Forecast> forecasts = new ArrayList<>();
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")
.setSize((int) forecastRequestStats.getRecordCount())
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE))
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastRequestStats.getForecastId())))
.addSort(SortBuilders.fieldSort(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC))
.execute().actionGet();
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
try {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, hit.getSourceRef());
forecasts.add(Forecast.PARSER.apply(parser, null));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
return forecasts;
}
@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {

View File

@ -11,6 +11,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -19,6 +23,14 @@ public class ForecastParamsTests extends ESTestCase {
private static ParseField END = new ParseField("end");
private static ParseField DURATION = new ParseField("duration");
public void testForecastIdsAreUnique() {
Set<String> ids = new HashSet<>();
for (int i = 0; i < 10; i++) {
ids.add(ForecastParams.builder().build().getForecastId());
}
assertThat(ids.size(), equalTo(10));
}
public void test_UnparseableEndTimeThrows() {
ElasticsearchParseException e =
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> ForecastParams.builder().endTime("bad", END).build());

View File

@ -87,12 +87,12 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
modelPlot = null;
}
if (randomBoolean()) {
forecast = new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()), randomNonNegativeLong(), randomInt());
forecast = new Forecast(jobId, randomAlphaOfLength(20), new Date(randomLong()), randomNonNegativeLong(), randomInt());
} else {
forecast = null;
}
if (randomBoolean()) {
forecastRequestStats = new ForecastRequestStats(jobId, randomNonNegativeLong());
forecastRequestStats = new ForecastRequestStats(jobId, randomAlphaOfLength(20));
} else {
forecastRequestStats = null;
}

View File

@ -24,10 +24,10 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
@Override
protected ForecastRequestStats createTestInstance() {
return createTestInstance("ForecastRequestStatsTest", randomNonNegativeLong());
return createTestInstance("ForecastRequestStatsTest", randomAlphaOfLength(20));
}
public ForecastRequestStats createTestInstance(String jobId, long forecastId) {
public ForecastRequestStats createTestInstance(String jobId, String forecastId) {
ForecastRequestStats forecastRequestStats = new ForecastRequestStats(jobId, forecastId);
if (randomBoolean()) {
@ -45,10 +45,16 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
forecastRequestStats.setTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setEndTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
forecastRequestStats.setStartTime(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setExpiryTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
forecastRequestStats.setEndTime(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setCreateTime(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setExpiryTime(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setProgress(randomDouble());

View File

@ -27,7 +27,7 @@ public class ForecastTests extends AbstractSerializingTestCase<Forecast> {
public Forecast createTestInstance(String jobId) {
Forecast forecast =
new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()),
new Forecast(jobId, randomAlphaOfLength(20), new Date(randomLong()),
randomNonNegativeLong(), randomInt());
if (randomBoolean()) {
@ -69,7 +69,7 @@ public class ForecastTests extends AbstractSerializingTestCase<Forecast> {
}
public void testId() {
Forecast forecast = new Forecast("job-foo", 222, new Date(100L), 60L, 2);
Forecast forecast = new Forecast("job-foo", "222", new Date(100L), 60L, 2);
String byFieldValue = null;
String partitionFieldValue = null;