[ML] Accept 'now' in start/end params of start datafeed API (elastic/x-pack-elasticsearch#784)

The params start/end of the start datafeed API now accept
'now' as a value.

Also adds a validation that start must be earlier than end.

relates elastic/x-pack-elasticsearch#781

Original commit: elastic/x-pack-elasticsearch@5396dcb5e8
This commit is contained in:
Dimitris Athanasiou 2017-03-21 17:05:36 +00:00 committed by GitHub
parent 72248adcbb
commit f7c4c754c2
6 changed files with 133 additions and 64 deletions

View File

@ -980,7 +980,6 @@
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]modelsnapshots[/\\]GetModelSnapshotsTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]notifications[/\\]AuditMessageTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]notifications[/\\]AuditorTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]rest[/\\]datafeeds[/\\]RestStartJobDatafeedActionTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]support[/\\]AbstractSerializingTestCase.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]support[/\\]AbstractStreamableTestCase.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]support[/\\]AbstractStreamableXContentTestCase.java" checks="LineLength" />

View File

@ -6,11 +6,13 @@
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
@ -23,6 +25,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -30,6 +33,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
@ -46,6 +50,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -61,6 +66,7 @@ import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.Objects;
import java.util.function.LongSupplier;
public class StartDatafeedAction
extends Action<StartDatafeedAction.Request, PersistentActionResponse, StartDatafeedAction.RequestBuilder> {
@ -92,12 +98,24 @@ public class StartDatafeedAction
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
PARSER.declareLong(Request::setEndTime, END_TIME);
PARSER.declareString((request, startTime) -> request.startTime = parseDateOrThrow(
startTime, START_TIME, () -> System.currentTimeMillis()), START_TIME);
PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
DateMathParser dateMathParser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER);
try {
return dateMathParser.parse(date, now);
} catch (Exception e) {
String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName.getPreferredName(), date);
throw new ElasticsearchParseException(msg, e);
}
}
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
@ -120,6 +138,10 @@ public class StartDatafeedAction
this.startTime = startTime;
}
public Request(String datafeedId, String startTime) {
this(datafeedId, parseDateOrThrow(startTime, START_TIME, () -> System.currentTimeMillis()));
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
@ -139,6 +161,10 @@ public class StartDatafeedAction
return endTime;
}
public void setEndTime(String endTime) {
setEndTime(parseDateOrThrow(endTime, END_TIME, () -> System.currentTimeMillis()));
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
@ -153,7 +179,13 @@ public class StartDatafeedAction
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException e = null;
if (endTime != null && endTime <= startTime) {
e = ValidateActions.addValidationError(START_TIME.getPreferredName() + " ["
+ startTime + "] must be earlier than " + END_TIME.getPreferredName()
+ " [" + endTime + "]", e);
}
return e;
}
@Override
@ -188,9 +220,9 @@ public class StartDatafeedAction
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.field(START_TIME.getPreferredName(), startTime);
builder.field(START_TIME.getPreferredName(), String.valueOf(startTime));
if (endTime != null) {
builder.field(END_TIME.getPreferredName(), endTime);
builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
}
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.endObject();
@ -235,8 +267,8 @@ public class StartDatafeedAction
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
this.datafeedId = request.getDatafeedId();
this.startTime = request.startTime;
this.endTime = request.endTime;
this.startTime = request.getStartTime();
this.endTime = request.getEndTime();
}
/* only for testing */

View File

@ -5,13 +5,11 @@
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
@ -22,7 +20,6 @@ import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import java.io.IOException;
@ -45,15 +42,11 @@ public class RestStartDatafeedAction extends BaseRestHandler {
XContentParser parser = restRequest.contentOrSourceParamParser();
jobDatafeedRequest = StartDatafeedAction.Request.parseRequest(datafeedId, parser);
} else {
long startTimeMillis = parseDateOrThrow(restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(),
DEFAULT_START), StartDatafeedAction.START_TIME.getPreferredName());
Long endTimeMillis = null;
String startTime = restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(), DEFAULT_START);
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTime);
if (restRequest.hasParam(StartDatafeedAction.END_TIME.getPreferredName())) {
endTimeMillis = parseDateOrThrow(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()),
StartDatafeedAction.END_TIME.getPreferredName());
jobDatafeedRequest.setEndTime(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()));
}
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis);
jobDatafeedRequest.setEndTime(endTimeMillis);
if (restRequest.hasParam(StartDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(
StartDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
@ -74,13 +67,4 @@ public class RestStartDatafeedAction extends BaseRestHandler {
});
};
}
static long parseDateOrThrow(String date, String paramName) {
try {
return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parser().parseMillis(date);
} catch (IllegalArgumentException e) {
String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName, date);
throw new ElasticsearchParseException(msg, e);
}
}
}

View File

@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<StartDatafeedAction.Request> {
@Override
@ -34,4 +37,18 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
return Request.parseRequest(null, parser);
}
public void testParseDateOrThrow() {
assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("0",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("1970-01-01T00:00:00Z",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertThat(StartDatafeedAction.Request.parseDateOrThrow("now",
StartDatafeedAction.START_TIME, () -> 123456789L), equalTo(123456789L));
Exception e = expectThrows(ElasticsearchParseException.class,
() -> StartDatafeedAction.Request.parseDateOrThrow("not-a-date",
StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());
}
}

View File

@ -19,37 +19,32 @@ import java.util.Map;
import static org.mockito.Mockito.mock;
public class RestStartJobDatafeedActionTests extends ESTestCase {
public class RestStartDatafeedActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY, mock(RestController.class));
RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY,
mock(RestController.class));
Map<String, String> params = new HashMap<>();
params.put("start", "not-a-date");
params.put("datafeed_id", "foo-datafeed");
RestRequest restRequest1 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(params).build();
RestRequest restRequest1 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withParams(params).build();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> action.prepareRequest(restRequest1, mock(NodeClient.class)));
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or " +
"converted to a number (epoch).",
e.getMessage());
params = new HashMap<>();
params.put("start", "now");
params.put("end", "not-a-date");
params.put("datafeed_id", "foo-datafeed");
RestRequest restRequest2 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(params).build();
e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest2, mock(NodeClient.class)));
assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());
RestRequest restRequest2 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withParams(params).build();
e = expectThrows(ElasticsearchParseException.class,
() -> action.prepareRequest(restRequest2, mock(NodeClient.class)));
assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or " +
"converted to a number (epoch).", e.getMessage());
}
public void testParseDateOrThrow() {
assertEquals(0L, RestStartDatafeedAction.parseDateOrThrow("0", "start"));
assertEquals(0L, RestStartDatafeedAction.parseDateOrThrow("1970-01-01T00:00:00Z", "start"));
Exception e = expectThrows(ElasticsearchParseException.class,
() -> RestStartDatafeedAction.parseDateOrThrow("not-a-date", "start"));
assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
e.getMessage());
}
}

View File

@ -35,6 +35,7 @@ setup:
"indexes":"airline-data",
"types":"response"
}
---
"Test start and stop datafeed happy path":
- do:
@ -42,39 +43,54 @@ setup:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: started }
- do:
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
datafeed_id: "datafeed-1"
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: stopped }
---
"Test start datafeed given start is now":
- do:
xpack.ml.open_job:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
datafeed_id: "datafeed-1"
start: "now"
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: started }
---
"Test start non existing datafeed":
- do:
catch: missing
xpack.ml.start_datafeed:
"datafeed_id": "non-existing-datafeed"
"start": 0
datafeed_id: "non-existing-datafeed"
start: 0
---
"Test start datafeed job, but not open":
- do:
catch: conflict
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
catch: /cannot start datafeed, expected job state \[opened\], but got \[closed\]/
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
---
"Test start already started datafeed job":
@ -83,34 +99,60 @@ setup:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
catch: conflict
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
---
"Test stop non existing datafeed":
- do:
catch: missing
xpack.ml.stop_datafeed:
"datafeed_id": "non-existing-datafeed"
datafeed_id: "non-existing-datafeed"
---
"Test stop already stopped datafeed job":
- do:
catch: conflict
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
datafeed_id: "datafeed-1"
- do:
catch: /datafeed already stopped, expected datafeed state \[started\], but got \[stopped\]/
xpack.ml.stop_datafeed:
"datafeed_id": "datafeed-1"
datafeed_id: "datafeed-1"
---
"Test start given end earlier than start":
- do:
xpack.ml.open_job:
job_id: "datafeed-job"
- do:
catch: /.* start \[1485910800000\] must be earlier than end \[1485907200000\]/
xpack.ml.start_datafeed:
datafeed_id: "datafeed-1"
start: "2017-02-01T01:00:00Z"
end: "2017-02-01T00:00:00Z"
---
"Test start given end same as start":
- do:
xpack.ml.open_job:
job_id: "datafeed-job"
- do:
catch: /.* start \[1485910800000\] must be earlier than end \[1485910800000\]/
xpack.ml.start_datafeed:
datafeed_id: "datafeed-1"
start: "2017-02-01T01:00:00Z"
end: "2017-02-01T01:00:00Z"