[ML-FC] implement endpoint parameter that takes a duration (elastic/x-pack-elasticsearch#3027)

Adds a duration parameter to the forecast API. Also fixes issue if no parameter is given (forecast 1 day from time of last bucket), in which case it lets autodetect decide

depends on elastic/machine-learning-cpp#407

Original commit: elastic/x-pack-elasticsearch@3387478872
This commit is contained in:
Hendrik Muhs 2017-11-17 06:37:03 +01:00 committed by GitHub
parent 43a2572e7d
commit a583cf270b
7 changed files with 87 additions and 25 deletions

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -56,12 +57,14 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> implements ToXContentObject {
public static final ParseField END_TIME = new ParseField("end");
public static final ParseField DURATION = new ParseField("duration");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString(Request::setDuration, DURATION);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -73,6 +76,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
}
private String endTime;
private TimeValue duration;
Request() {
}
@ -89,21 +93,31 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
this.endTime = endTime;
}
public TimeValue getDuration() {
return duration;
}
public void setDuration(String duration) {
this.duration = TimeValue.parseTimeValue(duration, DURATION.getPreferredName());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.endTime = in.readOptionalString();
this.duration = in.readOptionalWriteable(TimeValue::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(endTime);
out.writeOptionalWriteable(duration);
}
@Override
public int hashCode() {
return Objects.hash(jobId, endTime);
return Objects.hash(jobId, endTime, duration);
}
@Override
@ -115,7 +129,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime);
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration);
}
@Override
@ -125,6 +139,9 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
}
if (duration != null) {
builder.field(DURATION.getPreferredName(), duration.getStringRep());
}
builder.endObject();
return builder;
}
@ -219,6 +236,9 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (request.getEndTime() != null) {
paramsBuilder.endTime(request.getEndTime(), END_TIME);
}
if (request.getDuration() != null) {
paramsBuilder.duration(request.getDuration());
}
ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {

View File

@ -164,6 +164,7 @@ public final class Messages {
"Model snapshot ''{0}'' is the active snapshot for job ''{1}'', so cannot be deleted";
public static final String REST_INVALID_DATETIME_PARAMS =
"Query param [{0}] with value [{1}] cannot be parsed as a date or converted to a number (epoch).";
public static final String REST_INVALID_DURATION_AND_ENDTIME = "Specify either duration or end time";
public static final String REST_INVALID_FLUSH_PARAMS_MISSING = "Invalid flush parameters: ''{0}'' has not been specified.";
public static final String REST_INVALID_FLUSH_PARAMS_UNEXPECTED = "Invalid flush parameters: unexpected ''{0}''.";
public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed.";

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -16,11 +17,13 @@ import java.util.Objects;
public class ForecastParams {
private final long endTime;
private final long duration;
private final long id;
private ForecastParams(long id, long endTime) {
private ForecastParams(long id, long endTime, long duration) {
this.id = id;
this.endTime = endTime;
this.duration = duration;
}
/**
@ -31,6 +34,14 @@ public class ForecastParams {
return endTime;
}
/**
* The forecast duration in seconds
* @return The duration in seconds
*/
public long getDuration() {
return duration;
}
/**
* The forecast id
*
@ -42,7 +53,7 @@ public class ForecastParams {
@Override
public int hashCode() {
return Objects.hash(id, endTime);
return Objects.hash(id, endTime, duration);
}
@Override
@ -54,27 +65,24 @@ public class ForecastParams {
return false;
}
ForecastParams other = (ForecastParams) obj;
return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime);
return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long endTimeEpochSecs;
private long durationSecs;
private long startTime;
private long forecastId;
private Builder() {
startTime = System.currentTimeMillis();
endTimeEpochSecs = tomorrow(startTime);
endTimeEpochSecs = 0;
forecastId = generateId();
}
static long tomorrow(long now) {
return (now / 1000) + (60 * 60 * 24);
durationSecs = 0;
}
private long generateId() {
@ -94,8 +102,17 @@ public class ForecastParams {
return this;
}
public Builder duration(TimeValue duration) {
durationSecs = duration.seconds();
return this;
}
public ForecastParams build() {
return new ForecastParams(forecastId, endTimeEpochSecs);
if (endTimeEpochSecs != 0 && durationSecs != 0) {
throw new ElasticsearchParseException(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME));
}
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs);
}
}
}

View File

@ -153,9 +153,15 @@ public class ControlMsgToProcessWriter {
public void writeForecastMessage(ForecastParams params) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("forecast_id", params.getId())
.field("end_time", params.getEndTime())
.endObject();
.field("forecast_id", params.getId());
if (params.getEndTime() != 0) {
builder.field("end_time", params.getEndTime());
}
if (params.getDuration() != 0) {
builder.field("duration", params.getDuration());
}
builder.endObject();
writeMessage(FORECAST_MESSAGE_CODE + builder.string());
fillCommandBuffer();

View File

@ -43,6 +43,9 @@ public class RestForecastJobAction extends BaseRestHandler {
if (restRequest.hasParam(ForecastJobAction.Request.END_TIME.getPreferredName())) {
request.setEndTime(restRequest.param(ForecastJobAction.Request.END_TIME.getPreferredName()));
}
if (restRequest.hasParam(ForecastJobAction.Request.DURATION.getPreferredName())) {
request.setDuration(restRequest.param(ForecastJobAction.Request.DURATION.getPreferredName()));
}
}
return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel));

View File

@ -5,10 +5,13 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.action.ForecastJobAction.Request;
import java.time.Instant;
public class ForecastJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
@Override
@ -24,6 +27,13 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setEndTime(Instant.ofEpochMilli(randomNonNegativeLong()).toString());
}
if (randomBoolean()) {
request.setDuration(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)).getStringRep());
}
return request;
}
@ -31,5 +41,4 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.params;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -16,15 +17,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ForecastParamsTests extends ESTestCase {
private static ParseField END = new ParseField("end");
public void testDefault_GivesTomorrowTimeInSeconds() {
long nowSecs = System.currentTimeMillis() / 1000;
nowSecs += 60 * 60 * 24;
ForecastParams params = ForecastParams.builder().build();
assertThat(params.getEndTime(), greaterThanOrEqualTo(nowSecs));
assertThat(params.getEndTime(), lessThanOrEqualTo(nowSecs +1));
}
private static ParseField DURATION = new ParseField("duration");
public void test_UnparseableEndTimeThrows() {
ElasticsearchParseException e =
@ -41,4 +34,17 @@ public class ForecastParamsTests extends ESTestCase {
assertThat(end, greaterThanOrEqualTo(nowSecs + 7200));
assertThat(end, lessThanOrEqualTo(nowSecs + 7200 +1));
}
public void testDurationFormats() {
assertEquals(34678L,
ForecastParams.builder().duration(TimeValue.parseTimeValue("34678s", DURATION.getPreferredName())).build().getDuration());
assertEquals(172800L,
ForecastParams.builder().duration(TimeValue.parseTimeValue("2d", DURATION.getPreferredName())).build().getDuration());
}
public void testDurationEndTimeThrows() {
ElasticsearchParseException e = ESTestCase.expectThrows(ElasticsearchParseException.class, () -> ForecastParams.builder()
.endTime("2016-05-01T10:00:00Z", END).duration(TimeValue.parseTimeValue("33d", DURATION.getPreferredName())).build());
assertEquals(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME), e.getMessage());
}
}