diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java index f738ff15829..43e3dd6202f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -175,13 +175,13 @@ public class ForecastJobAction extends Action PARSER = new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> - new Forecast((String) a[0], (long) a[1], (Date) a[2], (long) a[3], (int) a[4])); + new Forecast((String) a[0], (String) a[1], (Date) a[2], (long) a[3], (int) a[4])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FORECAST_ID); PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { if (p.currentToken() == Token.VALUE_NUMBER) { return new Date(p.longValue()); @@ -73,7 +73,7 @@ public class Forecast implements ToXContentObject, Writeable { } private final String jobId; - private final long forecastId; + private final String forecastId; private final Date timestamp; private final long bucketSpan; private int detectorIndex; @@ -86,9 +86,9 @@ public class Forecast implements ToXContentObject, Writeable { private double forecastUpper; private double forecastPrediction; - public Forecast(String jobId, long forecastId, Date timestamp, long bucketSpan, int detectorIndex) { - this.jobId = jobId; - this.forecastId = forecastId; + public Forecast(String jobId, String forecastId, Date timestamp, long bucketSpan, int detectorIndex) { + this.jobId = Objects.requireNonNull(jobId); + this.forecastId = Objects.requireNonNull(forecastId); this.timestamp = timestamp; this.bucketSpan = bucketSpan; this.detectorIndex = detectorIndex; @@ -96,7 +96,7 @@ public class Forecast implements ToXContentObject, Writeable { public Forecast(StreamInput in) throws IOException { jobId = in.readString(); - forecastId = in.readLong(); + forecastId = in.readString(); timestamp = new Date(in.readLong()); partitionFieldName = in.readOptionalString(); partitionFieldValue = in.readOptionalString(); @@ -113,7 +113,7 @@ public class Forecast implements ToXContentObject, Writeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - out.writeLong(forecastId); + out.writeString(forecastId); out.writeLong(timestamp.getTime()); out.writeOptionalString(partitionFieldName); out.writeOptionalString(partitionFieldValue); @@ -165,7 +165,7 @@ public class Forecast implements ToXContentObject, Writeable { return jobId; } - public long getForecastId() { + public String getForecastId() { return forecastId; } @@ -264,7 +264,7 @@ public class Forecast implements ToXContentObject, Writeable { } Forecast that = (Forecast) other; return Objects.equals(this.jobId, that.jobId) && - forecastId == that.forecastId && + Objects.equals(this.forecastId, that.forecastId) && Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.partitionFieldValue, that.partitionFieldValue) && Objects.equals(this.partitionFieldName, that.partitionFieldName) && diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java index 05cee35a6ff..5bd9a9f90e6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java @@ -10,9 +10,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.xpack.ml.job.config.Job; import java.io.IOException; @@ -36,7 +36,9 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE); public static final ParseField FORECAST_ID = new ParseField("forecast_id"); + public static final ParseField START_TIME = new ParseField("forecast_start_timestamp"); public static final ParseField END_TIME = new ParseField("forecast_end_timestamp"); + public static final ParseField CREATE_TIME = new ParseField("forecast_create_timestamp"); public static final ParseField EXPIRY_TIME = new ParseField("forecast_expiry_timestamp"); public static final ParseField MESSAGES = new ParseField("forecast_messages"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms"); @@ -46,20 +48,24 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { public static final ParseField MEMORY_USAGE = new ParseField("forecast_memory_bytes"); public static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastRequestStats((String) a[0], (long) a[1])); + new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ForecastRequestStats((String) a[0], (String) a[1])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), FORECAST_ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FORECAST_ID); PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE); PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT); PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES); PARSER.declareField(ForecastRequestStats::setTimeStamp, p -> Instant.ofEpochMilli(p.longValue()), Result.TIMESTAMP, ValueType.LONG); - PARSER.declareField(ForecastRequestStats::setEndTimeStamp, + PARSER.declareField(ForecastRequestStats::setStartTime, + p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG); + PARSER.declareField(ForecastRequestStats::setEndTime, p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG); - PARSER.declareField(ForecastRequestStats::setExpiryTimeStamp, + PARSER.declareField(ForecastRequestStats::setCreateTime, + p -> Instant.ofEpochMilli(p.longValue()), CREATE_TIME, ValueType.LONG); + PARSER.declareField(ForecastRequestStats::setExpiryTime, p -> Instant.ofEpochMilli(p.longValue()), EXPIRY_TIME, ValueType.LONG); PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS); PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS); @@ -90,34 +96,39 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { } private final String jobId; - private final long forecastId; + private final String forecastId; private long recordCount; private List messages; - private Instant dateStarted = Instant.EPOCH; - private Instant dateEnded = Instant.EPOCH; - private Instant dateExpires = Instant.EPOCH; + private Instant timestamp = Instant.EPOCH; + private Instant startTime = Instant.EPOCH; + private Instant endTime = Instant.EPOCH; + private Instant createTime = Instant.EPOCH; + private Instant expiryTime = Instant.EPOCH; private double progress; private long processingTime; private long memoryUsage; private ForecastRequestStatus status = ForecastRequestStatus.OK; - public ForecastRequestStats(String jobId, long forecastId) { - this.jobId = jobId; - this.forecastId = forecastId; + public ForecastRequestStats(String jobId, String forecastId) { + this.jobId = Objects.requireNonNull(jobId); + this.forecastId = Objects.requireNonNull(forecastId); } public ForecastRequestStats(StreamInput in) throws IOException { jobId = in.readString(); - forecastId = in.readLong(); + forecastId = in.readString(); recordCount = in.readLong(); if (in.readBoolean()) { messages = in.readList(StreamInput::readString); } else { messages = null; } - dateStarted = Instant.ofEpochMilli(in.readVLong()); - dateEnded = Instant.ofEpochMilli(in.readVLong()); - dateExpires = Instant.ofEpochMilli(in.readVLong()); + + timestamp = Instant.ofEpochMilli(in.readVLong()); + startTime = Instant.ofEpochMilli(in.readVLong()); + endTime = Instant.ofEpochMilli(in.readVLong()); + createTime = Instant.ofEpochMilli(in.readVLong()); + expiryTime = Instant.ofEpochMilli(in.readVLong()); progress = in.readDouble(); processingTime = in.readLong(); setMemoryUsage(in.readLong()); @@ -127,7 +138,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - out.writeLong(forecastId); + out.writeString(forecastId); out.writeLong(recordCount); if (messages != null) { out.writeBoolean(true); @@ -135,9 +146,11 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { } else { out.writeBoolean(false); } - out.writeVLong(dateStarted.toEpochMilli()); - out.writeVLong(dateEnded.toEpochMilli()); - out.writeVLong(dateExpires.toEpochMilli()); + out.writeVLong(timestamp.toEpochMilli()); + out.writeVLong(startTime.toEpochMilli()); + out.writeVLong(endTime.toEpochMilli()); + out.writeVLong(createTime.toEpochMilli()); + out.writeVLong(expiryTime.toEpochMilli()); out.writeDouble(progress); out.writeLong(processingTime); out.writeLong(getMemoryUsage()); @@ -154,14 +167,20 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { if (messages != null) { builder.field(MESSAGES.getPreferredName(), messages); } - if (dateStarted.equals(Instant.EPOCH) == false) { - builder.field(Result.TIMESTAMP.getPreferredName(), dateStarted.toEpochMilli()); + if (timestamp.equals(Instant.EPOCH) == false) { + builder.field(Result.TIMESTAMP.getPreferredName(), timestamp.toEpochMilli()); } - if (dateEnded.equals(Instant.EPOCH) == false) { - builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli()); + if (startTime.equals(Instant.EPOCH) == false) { + builder.field(START_TIME.getPreferredName(), startTime.toEpochMilli()); } - if (dateExpires.equals(Instant.EPOCH) == false) { - builder.field(EXPIRY_TIME.getPreferredName(), dateExpires.toEpochMilli()); + if (endTime.equals(Instant.EPOCH) == false) { + builder.field(END_TIME.getPreferredName(), endTime.toEpochMilli()); + } + if (createTime.equals(Instant.EPOCH) == false) { + builder.field(CREATE_TIME.getPreferredName(), createTime.toEpochMilli()); + } + if (expiryTime.equals(Instant.EPOCH) == false) { + builder.field(EXPIRY_TIME.getPreferredName(), expiryTime.toEpochMilli()); } builder.field(PROGRESS.getPreferredName(), progress); builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime); @@ -175,7 +194,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { return jobId; } - public long getForecastId() { + public String getForecastId() { return forecastId; } @@ -193,7 +212,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { this.recordCount = recordCount; } - public double getRecordCount() { + public long getRecordCount() { return recordCount; } @@ -205,28 +224,44 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { this.messages = messages; } - public Instant getDateStarted() { - return dateStarted; + public void setTimeStamp(Instant timestamp) { + this.timestamp = timestamp; } - public void setTimeStamp(Instant dateStarted) { - this.dateStarted = dateStarted; + public Instant getTimestamp() { + return timestamp; } - public Instant getDateEnded() { - return dateEnded; + public void setStartTime(Instant startTime) { + this.startTime = startTime; } - public void setEndTimeStamp(Instant dateEnded) { - this.dateEnded = dateEnded; + public Instant getStartTime() { + return startTime; } - public void setExpiryTimeStamp(Instant dateExpires) { - this.dateExpires = dateExpires; + public Instant getEndTime() { + return endTime; } - public Instant getDateExpired() { - return dateExpires; + public void setEndTime(Instant endTime) { + this.endTime = endTime; + } + + public void setCreateTime(Instant createTime) { + this.createTime = createTime; + } + + public Instant getCreateTime() { + return createTime; + } + + public void setExpiryTime(Instant expiryTime) { + this.expiryTime = expiryTime; + } + + public Instant getExpiryTime() { + return expiryTime; } /** @@ -278,12 +313,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { } ForecastRequestStats that = (ForecastRequestStats) other; return Objects.equals(this.jobId, that.jobId) && - this.forecastId == that.forecastId && + Objects.equals(this.forecastId, that.forecastId) && this.recordCount == that.recordCount && Objects.equals(this.messages, that.messages) && - Objects.equals(this.dateStarted, that.dateStarted) && - Objects.equals(this.dateEnded, that.dateEnded) && - Objects.equals(this.dateExpires, that.dateExpires) && + Objects.equals(this.timestamp, that.timestamp) && + Objects.equals(this.startTime, that.startTime) && + Objects.equals(this.endTime, that.endTime) && + Objects.equals(this.createTime, that.createTime) && + Objects.equals(this.expiryTime, that.expiryTime) && this.progress == that.progress && this.processingTime == that.processingTime && this.memoryUsage == that.memoryUsage && @@ -292,7 +329,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { @Override public int hashCode() { - return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, dateExpires, + return Objects.hash(jobId, forecastId, recordCount, messages, timestamp, startTime, endTime, createTime, expiryTime, progress, processingTime, memoryUsage, status); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index 2a4fb7f30dc..d65c9a68612 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -131,12 +131,15 @@ public final class ReservedFieldNames { Forecast.FORECAST_PREDICTION.getPreferredName(), Forecast.FORECAST_ID.getPreferredName(), - //re-use: ForecastRequestStats.TIMESTAMP + //re-use: TIMESTAMP + ForecastRequestStats.START_TIME.getPreferredName(), ForecastRequestStats.END_TIME.getPreferredName(), + ForecastRequestStats.CREATE_TIME.getPreferredName(), ForecastRequestStats.EXPIRY_TIME.getPreferredName(), ForecastRequestStats.MESSAGES.getPreferredName(), ForecastRequestStats.PROGRESS.getPreferredName(), ForecastRequestStats.STATUS.getPreferredName(), + ForecastRequestStats.MEMORY_USAGE.getPreferredName(), ModelSizeStats.MODEL_BYTES_FIELD.getPreferredName(), ModelSizeStats.TOTAL_BY_FIELD_COUNT_FIELD.getPreferredName(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index a395cba0b53..21330c84575 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -119,7 +119,7 @@ public class ExpiredForecastsRemover implements MlDataRemover { XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( NamedXContentRegistry.EMPTY, hit.getSourceRef()); ForecastRequestStats forecastRequestStats = ForecastRequestStats.PARSER.apply(parser, null); - if (forecastRequestStats.getDateExpired().toEpochMilli() < cutoffEpochMs) { + if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { forecastsToDelete.add(forecastRequestStats); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java index 7933b49f3fd..a3e14c53b8e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/ForecastJobActionResponseTests.java @@ -12,7 +12,7 @@ public class ForecastJobActionResponseTests extends AbstractStreamableTestCase shortExpiryForecastIds = new ArrayList<>(); + List shortExpiryForecastIds = new ArrayList<>(); long now = System.currentTimeMillis(); long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1; @@ -123,16 +123,13 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { // Now let's create some forecasts openJob(job.getId()); - long forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1)); - // We need to wait so that forecasts get different IDs - awaitBusy(() -> false, 5, TimeUnit.MILLISECONDS); + + // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment + // the DeleteExpiredDataAction is called. + String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1)); shortExpiryForecastIds.add(forecastShortExpiryId); - - long forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null); - awaitBusy(() -> false, 5, TimeUnit.MILLISECONDS); - - long forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO); - + String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null); + String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO); waitForecastToFinish(job.getId(), forecastShortExpiryId); waitForecastToFinish(job.getId(), forecastDefaultExpiryId); waitForecastToFinish(job.getId(), forecastNoExpiryId); @@ -166,8 +163,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { List forecastStats = getForecastStats(); assertThat(forecastStats.size(), equalTo(getJobs().size() * 3)); for (ForecastRequestStats forecastStat : forecastStats) { - assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), - equalTo((long) forecastStat.getRecordCount())); + assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); } client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); @@ -213,11 +209,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { forecastStats = getForecastStats(); assertThat(forecastStats.size(), equalTo(getJobs().size() * 2)); for (ForecastRequestStats forecastStat : forecastStats) { - assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), - equalTo((long) forecastStat.getRecordCount())); + assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); } for (Job.Builder job : getJobs()) { - for (long forecastId : shortExpiryForecastIds) { + for (String forecastId : shortExpiryForecastIds) { assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L)); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java new file mode 100644 index 00000000000..2c9742b67fb --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.job.results.Forecast; +import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats; +import org.junit.After; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +public class ForecastIT extends MlNativeAutodetectIntegTestCase { + + @After + public void tearDownData() throws Exception { + cleanUp(); + } + + public void testSingleSeries() throws Exception { + Detector.Builder detector = new Detector.Builder("mean", "value"); + + TimeValue bucketSpan = TimeValue.timeValueHours(1); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder("forecast-it-test-single-series"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long now = Instant.now().getEpochSecond(); + long timestamp = now - 50 * bucketSpan.seconds(); + List data = new ArrayList<>(); + while (timestamp < now) { + data.add(createJsonRecord(createRecord(timestamp, 10.0))); + data.add(createJsonRecord(createRecord(timestamp, 30.0))); + timestamp += bucketSpan.seconds(); + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), false); + + // Now we can start doing forecast requests + + String forecastIdDefaultDurationDefaultExpiry = forecast(job.getId(), null, null); + String forecastIdDuration1HourNoExpiry = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); + String forecastIdDuration3HoursExpiresIn24Hours = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueHours(24)); + + waitForecastToFinish(job.getId(), forecastIdDefaultDurationDefaultExpiry); + waitForecastToFinish(job.getId(), forecastIdDuration1HourNoExpiry); + waitForecastToFinish(job.getId(), forecastIdDuration3HoursExpiresIn24Hours); + closeJob(job.getId()); + + List buckets = getBuckets(job.getId()); + Bucket lastBucket = buckets.get(buckets.size() - 1); + long lastBucketTime = lastBucket.getTimestamp().getTime(); + + // Now let's verify forecasts + double expectedForecastValue = 20.0; + + List forecastStats = getForecastStats(); + assertThat(forecastStats.size(), equalTo(3)); + Map idToForecastStats = new HashMap<>(); + forecastStats.stream().forEach(f -> idToForecastStats.put(f.getForecastId(), f)); + + { + ForecastRequestStats forecastDefaultDurationDefaultExpiry = idToForecastStats.get(forecastIdDefaultDurationDefaultExpiry); + assertThat(forecastDefaultDurationDefaultExpiry.getExpiryTime().toEpochMilli(), + equalTo(forecastDefaultDurationDefaultExpiry.getCreateTime().toEpochMilli() + + TimeValue.timeValueHours(14 * 24).getMillis())); + List forecasts = getForecasts(job.getId(), forecastDefaultDurationDefaultExpiry); + assertThat(forecastDefaultDurationDefaultExpiry.getRecordCount(), equalTo(24L)); + assertThat(forecasts.size(), equalTo(24)); + assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime)); + for (int i = 0; i < forecasts.size(); i++) { + Forecast forecast = forecasts.get(i); + assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis())); + assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds())); + assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01)); + } + } + + { + ForecastRequestStats forecastDuration1HourNoExpiry = idToForecastStats.get(forecastIdDuration1HourNoExpiry); + assertThat(forecastDuration1HourNoExpiry.getExpiryTime(), equalTo(Instant.EPOCH)); + List forecasts = getForecasts(job.getId(), forecastDuration1HourNoExpiry); + assertThat(forecastDuration1HourNoExpiry.getRecordCount(), equalTo(1L)); + assertThat(forecasts.size(), equalTo(1)); + assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime)); + for (int i = 0; i < forecasts.size(); i++) { + Forecast forecast = forecasts.get(i); + assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis())); + assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds())); + assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01)); + } + } + + { + ForecastRequestStats forecastDuration3HoursExpiresIn24Hours = idToForecastStats.get(forecastIdDuration3HoursExpiresIn24Hours); + assertThat(forecastDuration3HoursExpiresIn24Hours.getExpiryTime().toEpochMilli(), + equalTo(forecastDuration3HoursExpiresIn24Hours.getCreateTime().toEpochMilli() + + TimeValue.timeValueHours(24).getMillis())); + List forecasts = getForecasts(job.getId(), forecastDuration3HoursExpiresIn24Hours); + assertThat(forecastDuration3HoursExpiresIn24Hours.getRecordCount(), equalTo(3L)); + assertThat(forecasts.size(), equalTo(3)); + assertThat(forecasts.get(0).getTimestamp().getTime(), equalTo(lastBucketTime)); + for (int i = 0; i < forecasts.size(); i++) { + Forecast forecast = forecasts.get(i); + assertThat(forecast.getTimestamp().getTime(), equalTo(lastBucketTime + i * bucketSpan.getMillis())); + assertThat(forecast.getBucketSpan(), equalTo(bucketSpan.getSeconds())); + assertThat(forecast.getForecastPrediction(), closeTo(expectedForecastValue, 0.01)); + } + } + } + + private static Map createRecord(long timestamp, double value) { + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("value", value); + return record; + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index cc75ca31d0e..4d2f644c7c5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -26,6 +26,8 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.test.SecuritySettingsSource; @@ -291,16 +293,18 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return client().execute(PostDataAction.INSTANCE, request).actionGet().getDataCounts(); } - protected long forecast(String jobId, TimeValue duration, TimeValue expiresIn) { + protected String forecast(String jobId, TimeValue duration, TimeValue expiresIn) { ForecastJobAction.Request request = new ForecastJobAction.Request(jobId); - request.setDuration(duration.getStringRep()); + if (duration != null) { + request.setDuration(duration.getStringRep()); + } if (expiresIn != null) { request.setExpiresIn(expiresIn.getStringRep()); } return client().execute(ForecastJobAction.INSTANCE, request).actionGet().getForecastId(); } - protected void waitForecastToFinish(String jobId, long forecastId) throws Exception { + protected void waitForecastToFinish(String jobId, String forecastId) throws Exception { assertBusy(() -> { ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId); assertThat(forecastRequestStats, is(notNullValue())); @@ -308,7 +312,7 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { }, 30, TimeUnit.SECONDS); } - protected ForecastRequestStats getForecastStats(String jobId, long forecastId) { + protected ForecastRequestStats getForecastStats(String jobId, String forecastId) { SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)) .setQuery(QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)) @@ -350,7 +354,7 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return forecastStats; } - protected long countForecastDocs(String jobId, long forecastId) { + protected long countForecastDocs(String jobId, String forecastId) { SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") .setQuery(QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE)) @@ -360,6 +364,30 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return searchResponse.getHits().getTotalHits(); } + protected List getForecasts(String jobId, ForecastRequestStats forecastRequestStats) { + List forecasts = new ArrayList<>(); + + SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*") + .setSize((int) forecastRequestStats.getRecordCount()) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Forecast.RESULT_TYPE_VALUE)) + .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) + .filter(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastRequestStats.getForecastId()))) + .addSort(SortBuilders.fieldSort(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC)) + .execute().actionGet(); + SearchHits hits = searchResponse.getHits(); + for (SearchHit hit : hits) { + try { + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, hit.getSourceRef()); + forecasts.add(Forecast.PARSER.apply(parser, null)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + return forecasts; + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java index 33342ae1991..56e07551a24 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParamsTests.java @@ -11,6 +11,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.messages.Messages; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -19,6 +23,14 @@ public class ForecastParamsTests extends ESTestCase { private static ParseField END = new ParseField("end"); private static ParseField DURATION = new ParseField("duration"); + public void testForecastIdsAreUnique() { + Set ids = new HashSet<>(); + for (int i = 0; i < 10; i++) { + ids.add(ForecastParams.builder().build().getForecastId()); + } + assertThat(ids.size(), equalTo(10)); + } + public void test_UnparseableEndTimeThrows() { ElasticsearchParseException e = ESTestCase.expectThrows(ElasticsearchParseException.class, () -> ForecastParams.builder().endTime("bad", END).build()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java index 3dd8693fe80..7ffd9a2048f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/AutodetectResultTests.java @@ -87,12 +87,12 @@ public class AutodetectResultTests extends AbstractSerializingTestCase { public Forecast createTestInstance(String jobId) { Forecast forecast = - new Forecast(jobId, randomNonNegativeLong(), new Date(randomLong()), + new Forecast(jobId, randomAlphaOfLength(20), new Date(randomLong()), randomNonNegativeLong(), randomInt()); if (randomBoolean()) { @@ -69,7 +69,7 @@ public class ForecastTests extends AbstractSerializingTestCase { } public void testId() { - Forecast forecast = new Forecast("job-foo", 222, new Date(100L), 60L, 2); + Forecast forecast = new Forecast("job-foo", "222", new Date(100L), 60L, 2); String byFieldValue = null; String partitionFieldValue = null;