[ML-FC] add expires_in parameter and change forecast_start_timestamp to timestamp (elastic/x-pack-elasticsearch#3073)

add expires_in parameter and change forecast_start_timestamp to timestamp

depends on elastic/machine-learning-cpp#421

Original commit: elastic/x-pack-elasticsearch@3a3eebd49c
This commit is contained in:
Hendrik Muhs 2017-11-21 15:32:06 +01:00 committed by GitHub
parent 601222903d
commit cc66020cf3
8 changed files with 85 additions and 20 deletions

View File

@ -59,6 +59,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
public static final ParseField END_TIME = new ParseField("end");
public static final ParseField DURATION = new ParseField("duration");
public static final ParseField EXPIRES_IN = new ParseField("expires_in");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
@ -66,6 +67,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
PARSER.declareString((request, jobId) -> request.jobId = jobId, Job.ID);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString(Request::setDuration, DURATION);
PARSER.declareString(Request::setExpiresIn, EXPIRES_IN);
}
public static Request parseRequest(String jobId, XContentParser parser) {
@ -78,6 +80,7 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
private String endTime;
private TimeValue duration;
private TimeValue expiresIn;
Request() {
}
@ -102,11 +105,20 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
this.duration = TimeValue.parseTimeValue(duration, DURATION.getPreferredName());
}
public TimeValue getExpiresIn() {
return expiresIn;
}
public void setExpiresIn(String expiration) {
this.expiresIn = TimeValue.parseTimeValue(expiration, EXPIRES_IN.getPreferredName());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.endTime = in.readOptionalString();
this.duration = in.readOptionalWriteable(TimeValue::new);
this.expiresIn = in.readOptionalWriteable(TimeValue::new);
}
@Override
@ -114,11 +126,12 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
super.writeTo(out);
out.writeOptionalString(endTime);
out.writeOptionalWriteable(duration);
out.writeOptionalWriteable(expiresIn);
}
@Override
public int hashCode() {
return Objects.hash(jobId, endTime, duration);
return Objects.hash(jobId, endTime, duration, expiresIn);
}
@Override
@ -130,7 +143,8 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration);
return Objects.equals(jobId, other.jobId) && Objects.equals(endTime, other.endTime) &&
Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn);
}
@Override
@ -143,6 +157,9 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (duration != null) {
builder.field(DURATION.getPreferredName(), duration.getStringRep());
}
if (expiresIn != null) {
builder.field(EXPIRES_IN.getPreferredName(), expiresIn.getStringRep());
}
builder.endObject();
return builder;
}
@ -247,6 +264,9 @@ public class ForecastJobAction extends Action<ForecastJobAction.Request, Forecas
if (request.getDuration() != null) {
paramsBuilder.duration(request.getDuration());
}
if (request.getExpiresIn() != null) {
paramsBuilder.expiresIn(request.getExpiresIn());
}
ForecastParams params = paramsBuilder.build();
processManager.forecastJob(task, params, e -> {

View File

@ -340,11 +340,11 @@ public class ElasticsearchMappings {
.endObject();
// Forecast Stats Output
// re-used: PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
builder.startObject(ForecastRequestStats.START_TIME.getPreferredName())
// re-used: TIMESTAMP, PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME
builder.startObject(ForecastRequestStats.END_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.END_TIME.getPreferredName())
.startObject(ForecastRequestStats.EXPIRY_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(ForecastRequestStats.MESSAGES.getPreferredName())

View File

@ -18,12 +18,14 @@ public class ForecastParams {
private final long endTime;
private final long duration;
private final long expiresIn;
private final long forecastId;
private ForecastParams(long forecastId, long endTime, long duration) {
private ForecastParams(long forecastId, long endTime, long duration, long expiresIn) {
this.forecastId = forecastId;
this.endTime = endTime;
this.duration = duration;
this.expiresIn = expiresIn;
}
/**
@ -42,6 +44,14 @@ public class ForecastParams {
return duration;
}
/**
* The forecast expiration in seconds (duration added to start time)
* @return The expiration in seconds
*/
public long getExpiresIn() {
return expiresIn;
}
/**
* The forecast id
*
@ -53,7 +63,7 @@ public class ForecastParams {
@Override
public int hashCode() {
return Objects.hash(forecastId, endTime, duration);
return Objects.hash(forecastId, endTime, duration, expiresIn);
}
@Override
@ -65,9 +75,8 @@ public class ForecastParams {
return false;
}
ForecastParams other = (ForecastParams) obj;
return Objects.equals(forecastId, other.forecastId)
&& Objects.equals(endTime, other.endTime)
&& Objects.equals(duration, other.duration);
return Objects.equals(forecastId, other.forecastId) && Objects.equals(endTime, other.endTime) &&
Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn);
}
public static Builder builder() {
@ -77,6 +86,7 @@ public class ForecastParams {
public static class Builder {
private long endTimeEpochSecs;
private long durationSecs;
private long expiresInSecs;
private long startTime;
private long forecastId;
@ -85,6 +95,9 @@ public class ForecastParams {
endTimeEpochSecs = 0;
forecastId = generateId();
durationSecs = 0;
// because 0 means never expire, the default is -1
expiresInSecs = -1;
}
private long generateId() {
@ -109,12 +122,17 @@ public class ForecastParams {
return this;
}
public Builder expiresIn(TimeValue expiresIn) {
expiresInSecs = expiresIn.seconds();
return this;
}
public ForecastParams build() {
if (endTimeEpochSecs != 0 && durationSecs != 0) {
throw new ElasticsearchParseException(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME));
}
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs);
return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs, expiresInSecs);
}
}
}

View File

@ -161,6 +161,9 @@ public class ControlMsgToProcessWriter {
if (params.getDuration() != 0) {
builder.field("duration", params.getDuration());
}
if (params.getExpiresIn() != -1) {
builder.field("expires_in", params.getExpiresIn());
}
builder.endObject();
writeMessage(FORECAST_MESSAGE_CODE + builder.string());

View File

@ -36,8 +36,8 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField FORECAST_ID = new ParseField("forecast_id");
public static final ParseField START_TIME = new ParseField("forecast_start_timestamp");
public static final ParseField END_TIME = new ParseField("forecast_end_timestamp");
public static final ParseField EXPIRY_TIME = new ParseField("forecast_expiry_timestamp");
public static final ParseField MESSAGES = new ParseField("forecast_messages");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PROGRESS = new ParseField("forecast_progress");
@ -55,10 +55,12 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE);
PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT);
PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES);
PARSER.declareField(ForecastRequestStats::setStartTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), Result.TIMESTAMP, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setEndTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG);
PARSER.declareField(ForecastRequestStats::setExpiryTimeStamp,
p -> Instant.ofEpochMilli(p.longValue()), EXPIRY_TIME, ValueType.LONG);
PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS);
PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS);
PARSER.declareField(ForecastRequestStats::setStatus, p -> ForecastRequestStatus.fromString(p.text()), STATUS, ValueType.STRING);
@ -93,6 +95,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
private List<String> messages;
private Instant dateStarted = Instant.EPOCH;
private Instant dateEnded = Instant.EPOCH;
private Instant dateExpires = Instant.EPOCH;
private double progress;
private long processingTime;
private long memoryUsage;
@ -114,6 +117,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
}
dateStarted = Instant.ofEpochMilli(in.readVLong());
dateEnded = Instant.ofEpochMilli(in.readVLong());
dateExpires = Instant.ofEpochMilli(in.readVLong());
progress = in.readDouble();
processingTime = in.readLong();
setMemoryUsage(in.readLong());
@ -133,6 +137,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
}
out.writeVLong(dateStarted.toEpochMilli());
out.writeVLong(dateEnded.toEpochMilli());
out.writeVLong(dateExpires.toEpochMilli());
out.writeDouble(progress);
out.writeLong(processingTime);
out.writeLong(getMemoryUsage());
@ -150,11 +155,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
builder.field(MESSAGES.getPreferredName(), messages);
}
if (dateStarted.equals(Instant.EPOCH) == false) {
builder.field(START_TIME.getPreferredName(), dateStarted.toEpochMilli());
builder.field(Result.TIMESTAMP.getPreferredName(), dateStarted.toEpochMilli());
}
if (dateEnded.equals(Instant.EPOCH) == false) {
builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli());
}
if (dateExpires.equals(Instant.EPOCH) == false) {
builder.field(EXPIRY_TIME.getPreferredName(), dateExpires.toEpochMilli());
}
builder.field(PROGRESS.getPreferredName(), progress);
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime);
builder.field(MEMORY_USAGE.getPreferredName(), getMemoryUsage());
@ -197,7 +205,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
return dateStarted;
}
public void setStartTimeStamp(Instant dateStarted) {
public void setTimeStamp(Instant dateStarted) {
this.dateStarted = dateStarted;
}
@ -209,6 +217,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
this.dateEnded = dateEnded;
}
public void setExpiryTimeStamp(Instant dateExpires) {
this.dateExpires = dateExpires;
}
public Instant getDateExpired() {
return dateExpires;
}
/**
* Progress information of the ForecastRequest in the range 0 to 1,
* while 1 means finished
@ -263,6 +279,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
Objects.equals(this.messages, that.messages) &&
Objects.equals(this.dateStarted, that.dateStarted) &&
Objects.equals(this.dateEnded, that.dateEnded) &&
Objects.equals(this.dateExpires, that.dateExpires) &&
this.progress == that.progress &&
this.processingTime == that.processingTime &&
this.memoryUsage == that.memoryUsage &&
@ -271,7 +288,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, progress,
processingTime, memoryUsage, status);
return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, dateExpires,
progress, processingTime, memoryUsage, status);
}
}

View File

@ -131,8 +131,9 @@ public final class ReservedFieldNames {
Forecast.FORECAST_PREDICTION.getPreferredName(),
Forecast.FORECAST_ID.getPreferredName(),
ForecastRequestStats.START_TIME.getPreferredName(),
//re-use: ForecastRequestStats.TIMESTAMP
ForecastRequestStats.END_TIME.getPreferredName(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName(),
ForecastRequestStats.MESSAGES.getPreferredName(),
ForecastRequestStats.PROGRESS.getPreferredName(),
ForecastRequestStats.STATUS.getPreferredName(),

View File

@ -46,6 +46,9 @@ public class RestForecastJobAction extends BaseRestHandler {
if (restRequest.hasParam(ForecastJobAction.Request.DURATION.getPreferredName())) {
request.setDuration(restRequest.param(ForecastJobAction.Request.DURATION.getPreferredName()));
}
if (restRequest.hasParam(ForecastJobAction.Request.EXPIRES_IN.getPreferredName())) {
request.setExpiresIn(restRequest.param(ForecastJobAction.Request.EXPIRES_IN.getPreferredName()));
}
}
return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel));

View File

@ -42,11 +42,14 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
forecastRequestStats.setMessages(list);
}
if (randomBoolean()) {
forecastRequestStats.setStartTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
forecastRequestStats.setTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setEndTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setExpiryTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong()));
}
if (randomBoolean()) {
forecastRequestStats.setProgress(randomDouble());
}