diff --git a/dev-tools/checkstyle_suppressions.xml b/dev-tools/checkstyle_suppressions.xml
index ca21450138b..da1a06944e2 100644
--- a/dev-tools/checkstyle_suppressions.xml
+++ b/dev-tools/checkstyle_suppressions.xml
@@ -980,7 +980,6 @@
-
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java
index d522206784d..4ed713913d0 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java
@@ -6,11 +6,13 @@
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
@@ -23,6 +25,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -30,6 +33,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
@@ -46,6 +50,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
+import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@@ -61,6 +66,7 @@ import org.elasticsearch.xpack.persistent.TransportPersistentAction;
import java.io.IOException;
import java.util.Objects;
+import java.util.function.LongSupplier;
public class StartDatafeedAction
extends Action {
@@ -92,12 +98,24 @@ public class StartDatafeedAction
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
- PARSER.declareLong((request, startTime) -> request.startTime = startTime, START_TIME);
- PARSER.declareLong(Request::setEndTime, END_TIME);
+ PARSER.declareString((request, startTime) -> request.startTime = parseDateOrThrow(
+ startTime, START_TIME, () -> System.currentTimeMillis()), START_TIME);
+ PARSER.declareString(Request::setEndTime, END_TIME);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
}
+ static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
+ DateMathParser dateMathParser = new DateMathParser(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER);
+
+ try {
+ return dateMathParser.parse(date, now);
+ } catch (Exception e) {
+ String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName.getPreferredName(), date);
+ throw new ElasticsearchParseException(msg, e);
+ }
+ }
+
public static Request fromXContent(XContentParser parser) {
return parseRequest(null, parser);
}
@@ -120,6 +138,10 @@ public class StartDatafeedAction
this.startTime = startTime;
}
+ public Request(String datafeedId, String startTime) {
+ this(datafeedId, parseDateOrThrow(startTime, START_TIME, () -> System.currentTimeMillis()));
+ }
+
public Request(StreamInput in) throws IOException {
readFrom(in);
}
@@ -139,6 +161,10 @@ public class StartDatafeedAction
return endTime;
}
+ public void setEndTime(String endTime) {
+ setEndTime(parseDateOrThrow(endTime, END_TIME, () -> System.currentTimeMillis()));
+ }
+
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
@@ -153,7 +179,13 @@ public class StartDatafeedAction
@Override
public ActionRequestValidationException validate() {
- return null;
+ ActionRequestValidationException e = null;
+ if (endTime != null && endTime <= startTime) {
+ e = ValidateActions.addValidationError(START_TIME.getPreferredName() + " ["
+ + startTime + "] must be earlier than " + END_TIME.getPreferredName()
+ + " [" + endTime + "]", e);
+ }
+ return e;
}
@Override
@@ -188,9 +220,9 @@ public class StartDatafeedAction
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
- builder.field(START_TIME.getPreferredName(), startTime);
+ builder.field(START_TIME.getPreferredName(), String.valueOf(startTime));
if (endTime != null) {
- builder.field(END_TIME.getPreferredName(), endTime);
+ builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
}
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.endObject();
@@ -235,8 +267,8 @@ public class StartDatafeedAction
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
this.datafeedId = request.getDatafeedId();
- this.startTime = request.startTime;
- this.endTime = request.endTime;
+ this.startTime = request.getStartTime();
+ this.endTime = request.getEndTime();
}
/* only for testing */
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java
index cd0fd5c151b..bf349efa5bd 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java
@@ -5,13 +5,11 @@
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
-import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
@@ -22,7 +20,6 @@ import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
-import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import java.io.IOException;
@@ -45,15 +42,11 @@ public class RestStartDatafeedAction extends BaseRestHandler {
XContentParser parser = restRequest.contentOrSourceParamParser();
jobDatafeedRequest = StartDatafeedAction.Request.parseRequest(datafeedId, parser);
} else {
- long startTimeMillis = parseDateOrThrow(restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(),
- DEFAULT_START), StartDatafeedAction.START_TIME.getPreferredName());
- Long endTimeMillis = null;
+ String startTime = restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(), DEFAULT_START);
+ jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTime);
if (restRequest.hasParam(StartDatafeedAction.END_TIME.getPreferredName())) {
- endTimeMillis = parseDateOrThrow(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()),
- StartDatafeedAction.END_TIME.getPreferredName());
+ jobDatafeedRequest.setEndTime(restRequest.param(StartDatafeedAction.END_TIME.getPreferredName()));
}
- jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis);
- jobDatafeedRequest.setEndTime(endTimeMillis);
if (restRequest.hasParam(StartDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(
StartDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
@@ -74,13 +67,4 @@ public class RestStartDatafeedAction extends BaseRestHandler {
});
};
}
-
- static long parseDateOrThrow(String date, String paramName) {
- try {
- return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parser().parseMillis(date);
- } catch (IllegalArgumentException e) {
- String msg = Messages.getMessage(Messages.REST_INVALID_DATETIME_PARAMS, paramName, date);
- throw new ElasticsearchParseException(msg, e);
- }
- }
}
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java
index 9483857a4d3..33331b81fb5 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionRequestTests.java
@@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.ml.action;
+import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction.Request;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
+import static org.hamcrest.Matchers.equalTo;
+
public class StartDatafeedActionRequestTests extends AbstractStreamableXContentTestCase {
@Override
@@ -34,4 +37,18 @@ public class StartDatafeedActionRequestTests extends AbstractStreamableXContentT
return Request.parseRequest(null, parser);
}
+ public void testParseDateOrThrow() {
+ assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("0",
+ StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
+ assertEquals(0L, StartDatafeedAction.Request.parseDateOrThrow("1970-01-01T00:00:00Z",
+ StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
+ assertThat(StartDatafeedAction.Request.parseDateOrThrow("now",
+ StartDatafeedAction.START_TIME, () -> 123456789L), equalTo(123456789L));
+
+ Exception e = expectThrows(ElasticsearchParseException.class,
+ () -> StartDatafeedAction.Request.parseDateOrThrow("not-a-date",
+ StartDatafeedAction.START_TIME, () -> System.currentTimeMillis()));
+ assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
+ e.getMessage());
+ }
}
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedActionTests.java
similarity index 60%
rename from plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java
rename to plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedActionTests.java
index ed2e02ebb2f..497ad633b19 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartJobDatafeedActionTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedActionTests.java
@@ -19,37 +19,32 @@ import java.util.Map;
import static org.mockito.Mockito.mock;
-public class RestStartJobDatafeedActionTests extends ESTestCase {
+public class RestStartDatafeedActionTests extends ESTestCase {
public void testPrepareRequest() throws Exception {
- RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY, mock(RestController.class));
+ RestStartDatafeedAction action = new RestStartDatafeedAction(Settings.EMPTY,
+ mock(RestController.class));
Map params = new HashMap<>();
params.put("start", "not-a-date");
params.put("datafeed_id", "foo-datafeed");
- RestRequest restRequest1 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(params).build();
+ RestRequest restRequest1 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
+ .withParams(params).build();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> action.prepareRequest(restRequest1, mock(NodeClient.class)));
- assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
+ assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or " +
+ "converted to a number (epoch).",
e.getMessage());
params = new HashMap<>();
+ params.put("start", "now");
params.put("end", "not-a-date");
params.put("datafeed_id", "foo-datafeed");
- RestRequest restRequest2 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(params).build();
- e = expectThrows(ElasticsearchParseException.class, () -> action.prepareRequest(restRequest2, mock(NodeClient.class)));
- assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
- e.getMessage());
+ RestRequest restRequest2 = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
+ .withParams(params).build();
+ e = expectThrows(ElasticsearchParseException.class,
+ () -> action.prepareRequest(restRequest2, mock(NodeClient.class)));
+ assertEquals("Query param 'end' with value 'not-a-date' cannot be parsed as a date or " +
+ "converted to a number (epoch).", e.getMessage());
}
-
- public void testParseDateOrThrow() {
- assertEquals(0L, RestStartDatafeedAction.parseDateOrThrow("0", "start"));
- assertEquals(0L, RestStartDatafeedAction.parseDateOrThrow("1970-01-01T00:00:00Z", "start"));
-
- Exception e = expectThrows(ElasticsearchParseException.class,
- () -> RestStartDatafeedAction.parseDateOrThrow("not-a-date", "start"));
- assertEquals("Query param 'start' with value 'not-a-date' cannot be parsed as a date or converted to a number (epoch).",
- e.getMessage());
- }
-
}
diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml
index 6de48c236b3..2d9eba3b25e 100644
--- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml
+++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml
@@ -35,6 +35,7 @@ setup:
"indexes":"airline-data",
"types":"response"
}
+
---
"Test start and stop datafeed happy path":
- do:
@@ -42,39 +43,54 @@ setup:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: started }
- do:
xpack.ml.stop_datafeed:
- "datafeed_id": "datafeed-1"
+ datafeed_id: "datafeed-1"
- do:
xpack.ml.get_datafeed_stats:
datafeed_id: "datafeed-1"
- match: { datafeeds.0.state: stopped }
+
+---
+"Test start datafeed given start is now":
+ - do:
+ xpack.ml.open_job:
+ job_id: "datafeed-job"
+ - do:
+ xpack.ml.start_datafeed:
+ datafeed_id: "datafeed-1"
+ start: "now"
+ - do:
+ xpack.ml.get_datafeed_stats:
+ datafeed_id: "datafeed-1"
+ - match: { datafeeds.0.state: started }
+
---
"Test start non existing datafeed":
- do:
catch: missing
xpack.ml.start_datafeed:
- "datafeed_id": "non-existing-datafeed"
- "start": 0
+ datafeed_id: "non-existing-datafeed"
+ start: 0
---
"Test start datafeed job, but not open":
- do:
catch: conflict
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
- do:
catch: /cannot start datafeed, expected job state \[opened\], but got \[closed\]/
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
---
"Test start already started datafeed job":
@@ -83,34 +99,60 @@ setup:
job_id: "datafeed-job"
- do:
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
- do:
catch: conflict
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
- do:
catch: /datafeed \[datafeed\-1\] already started, expected datafeed state \[stopped\], but got \[started\]/
xpack.ml.start_datafeed:
- "datafeed_id": "datafeed-1"
- "start": 0
+ datafeed_id: "datafeed-1"
+ start: 0
---
"Test stop non existing datafeed":
- do:
catch: missing
xpack.ml.stop_datafeed:
- "datafeed_id": "non-existing-datafeed"
+ datafeed_id: "non-existing-datafeed"
---
"Test stop already stopped datafeed job":
- do:
catch: conflict
xpack.ml.stop_datafeed:
- "datafeed_id": "datafeed-1"
+ datafeed_id: "datafeed-1"
- do:
catch: /datafeed already stopped, expected datafeed state \[started\], but got \[stopped\]/
xpack.ml.stop_datafeed:
- "datafeed_id": "datafeed-1"
+ datafeed_id: "datafeed-1"
+
+---
+"Test start given end earlier than start":
+ - do:
+ xpack.ml.open_job:
+ job_id: "datafeed-job"
+
+ - do:
+ catch: /.* start \[1485910800000\] must be earlier than end \[1485907200000\]/
+ xpack.ml.start_datafeed:
+ datafeed_id: "datafeed-1"
+ start: "2017-02-01T01:00:00Z"
+ end: "2017-02-01T00:00:00Z"
+
+---
+"Test start given end same as start":
+ - do:
+ xpack.ml.open_job:
+ job_id: "datafeed-job"
+
+ - do:
+ catch: /.* start \[1485910800000\] must be earlier than end \[1485910800000\]/
+ xpack.ml.start_datafeed:
+ datafeed_id: "datafeed-1"
+ start: "2017-02-01T01:00:00Z"
+ end: "2017-02-01T01:00:00Z"