From bf0a0f74dae85e2ca7fe6c07a3ab585b7069a294 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 21 Sep 2018 05:56:38 -0700 Subject: [PATCH] HLRC: ML start data feed API (#33898) * HLRC: ML start data feed API --- .../client/MLRequestConverters.java | 14 ++ .../client/MachineLearningClient.java | 42 +++++ .../client/ml/StartDatafeedRequest.java | 160 ++++++++++++++++++ .../client/ml/StartDatafeedResponse.java | 93 ++++++++++ .../client/MLRequestConvertersTests.java | 15 ++ .../client/MachineLearningIT.java | 81 +++++++++ .../MlClientDocumentationIT.java | 66 ++++++++ .../client/ml/StartDatafeedRequestTests.java | 61 +++++++ .../client/ml/StartDatafeedResponseTests.java | 42 +++++ .../high-level/ml/start-datafeed.asciidoc | 71 ++++++++ .../high-level/supported-apis.asciidoc | 2 + 11 files changed, 647 insertions(+) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedRequest.java create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedRequestTests.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java create mode 100644 docs/java-rest/high-level/ml/start-datafeed.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index ed83e1b4aba..3b7be24d454 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -48,6 +48,7 @@ import org.elasticsearch.client.ml.PostDataRequest; import org.elasticsearch.client.ml.PutCalendarRequest; import org.elasticsearch.client.ml.PutDatafeedRequest; import org.elasticsearch.client.ml.PutJobRequest; +import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -231,6 +232,19 @@ final class MLRequestConverters { return request; } + static Request startDatafeed(StartDatafeedRequest startDatafeedRequest) throws IOException { + String endpoint = new EndpointBuilder() + .addPathPartAsIs("_xpack") + .addPathPartAsIs("ml") + .addPathPartAsIs("datafeeds") + .addPathPart(startDatafeedRequest.getDatafeedId()) + .addPathPartAsIs("_start") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(startDatafeedRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) { String endpoint = new EndpointBuilder() .addPathPartAsIs("_xpack") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index 06df9b31488..39a748d39ba 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -58,6 +58,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest; import org.elasticsearch.client.ml.PutDatafeedResponse; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.PutJobResponse; +import org.elasticsearch.client.ml.StartDatafeedRequest; +import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.job.stats.JobStats; @@ -565,6 +567,46 @@ public final class MachineLearningClient { Collections.emptySet()); } + /** + * Starts the given Machine Learning Datafeed + *

+ * For additional info + * see + * ML Start Datafeed documentation + * + * @param request The request to start the datafeed + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return action acknowledgement + * @throws IOException when there is a serialization issue sending the request or receiving the response + */ + public StartDatafeedResponse startDatafeed(StartDatafeedRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, + MLRequestConverters::startDatafeed, + options, + StartDatafeedResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Starts the given Machine Learning Datafeed asynchronously and notifies the listener on completion + *

+ * For additional info + * see + * ML Start Datafeed documentation + * + * @param request The request to start the datafeed + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener Listener to be notified upon request completion + */ + public void startDatafeedAsync(StartDatafeedRequest request, RequestOptions options, ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(request, + MLRequestConverters::startDatafeed, + options, + StartDatafeedResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job} *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedRequest.java new file mode 100644 index 00000000000..4ee6d747e57 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedRequest.java @@ -0,0 +1,160 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.client.ml.datafeed.DatafeedConfig; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Request to start a Datafeed + */ +public class StartDatafeedRequest extends ActionRequest implements ToXContentObject { + + public static final ParseField START = new ParseField("start"); + public static final ParseField END = new ParseField("end"); + public static final ParseField TIMEOUT = new ParseField("timeout"); + + public static ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("start_datafeed_request", a -> new StartDatafeedRequest((String)a[0])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID); + PARSER.declareString(StartDatafeedRequest::setStart, START); + PARSER.declareString(StartDatafeedRequest::setEnd, END); + PARSER.declareString((params, val) -> + params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + } + + private final String datafeedId; + private String start; + private String end; + private TimeValue timeout; + + /** + * Create a new StartDatafeedRequest for the given DatafeedId + * + * @param datafeedId non-null existing Datafeed ID + */ + public StartDatafeedRequest(String datafeedId) { + this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null"); + } + + public String getDatafeedId() { + return datafeedId; + } + + public String getStart() { + return start; + } + + /** + * The time that the datafeed should begin. This value is inclusive. + * + * If you specify a start value that is earlier than the timestamp of the latest processed record, + * the datafeed continues from 1 millisecond after the timestamp of the latest processed record. + * + * If you do not specify a start time and the datafeed is associated with a new job, + * the analysis starts from the earliest time for which data is available. + * + * @param start String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string + */ + public void setStart(String start) { + this.start = start; + } + + public String getEnd() { + return end; + } + + /** + * The time that the datafeed should end. This value is exclusive. + * If you do not specify an end time, the datafeed runs continuously. + * + * @param end String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string + */ + public void setEnd(String end) { + this.end = end; + } + + public TimeValue getTimeout() { + return timeout; + } + + /** + * Indicates how long to wait for the cluster to respond to the request. + * + * @param timeout TimeValue for how long to wait for a response from the cluster + */ + public void setTimeout(TimeValue timeout) { + this.timeout = timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public int hashCode() { + return Objects.hash(datafeedId, start, end, timeout); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || obj.getClass() != getClass()) { + return false; + } + + StartDatafeedRequest other = (StartDatafeedRequest) obj; + return Objects.equals(datafeedId, other.datafeedId) && + Objects.equals(start, other.start) && + Objects.equals(end, other.end) && + Objects.equals(timeout, other.timeout); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId); + if (start != null) { + builder.field(START.getPreferredName(), start); + } + if (end != null) { + builder.field(END.getPreferredName(), end); + } + if (timeout != null) { + builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java new file mode 100644 index 00000000000..d4ed46c5316 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response indicating if the Machine Learning Datafeed is now started or not + */ +public class StartDatafeedResponse extends ActionResponse implements ToXContentObject { + + private static final ParseField STARTED = new ParseField("started"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "start_datafeed_response", + true, + (a) -> new StartDatafeedResponse((Boolean)a[0])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED); + } + + private final boolean started; + + public StartDatafeedResponse(boolean started) { + this.started = started; + } + + public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Has the Datafeed started or not + * + * @return boolean value indicating the Datafeed started status + */ + public boolean isStarted() { + return started; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + StartDatafeedResponse that = (StartDatafeedResponse) other; + return isStarted() == that.isStarted(); + } + + @Override + public int hashCode() { + return Objects.hash(isStarted()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(STARTED.getPreferredName(), started); + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index 819e2f63449..b7e795d2b76 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -44,6 +44,8 @@ import org.elasticsearch.client.ml.PostDataRequest; import org.elasticsearch.client.ml.PutCalendarRequest; import org.elasticsearch.client.ml.PutDatafeedRequest; import org.elasticsearch.client.ml.PutJobRequest; +import org.elasticsearch.client.ml.StartDatafeedRequest; +import org.elasticsearch.client.ml.StartDatafeedRequestTests; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.calendars.Calendar; import org.elasticsearch.client.ml.calendars.CalendarTests; @@ -261,6 +263,19 @@ public class MLRequestConvertersTests extends ESTestCase { assertEquals(Boolean.toString(true), request.getParameters().get("force")); } + public void testStartDatafeed() throws Exception { + String datafeedId = DatafeedConfigTests.randomValidDatafeedId(); + StartDatafeedRequest datafeedRequest = StartDatafeedRequestTests.createRandomInstance(datafeedId); + + Request request = MLRequestConverters.startDatafeed(datafeedRequest); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals("/_xpack/ml/datafeeds/" + datafeedId + "/_start", request.getEndpoint()); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) { + StartDatafeedRequest parsedDatafeedRequest = StartDatafeedRequest.PARSER.apply(parser, null); + assertThat(parsedDatafeedRequest, equalTo(datafeedRequest)); + } + } + public void testDeleteForecast() { String jobId = randomAlphaOfLength(10); DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 19ca737d6e9..141b201a159 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -20,8 +20,12 @@ package org.elasticsearch.client; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ml.CloseJobRequest; import org.elasticsearch.client.ml.CloseJobResponse; @@ -51,6 +55,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest; import org.elasticsearch.client.ml.PutDatafeedResponse; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.PutJobResponse; +import org.elasticsearch.client.ml.StartDatafeedRequest; +import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.calendars.Calendar; import org.elasticsearch.client.ml.calendars.CalendarTests; @@ -63,6 +69,7 @@ import org.elasticsearch.client.ml.job.config.JobState; import org.elasticsearch.client.ml.job.config.JobUpdate; import org.elasticsearch.client.ml.job.stats.JobStats; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.junit.After; @@ -416,6 +423,80 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { assertTrue(response.isAcknowledged()); } + public void testStartDatafeed() throws Exception { + String jobId = "test-start-datafeed"; + String indexName = "start_data_1"; + + // Set up the index and docs + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long"); + highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); + BulkRequest bulk = new BulkRequest(); + bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + long now = System.currentTimeMillis(); + long oneDayAgo = now - 86400000; + int i = 0; + long dayAgoCopy = oneDayAgo; + while(dayAgoCopy < now) { + IndexRequest doc = new IndexRequest(); + doc.index(indexName); + doc.type("doc"); + doc.id("id" + i); + doc.source("{\"total\":" +randomInt(1000) + ",\"timestamp\":"+ dayAgoCopy +"}", XContentType.JSON); + bulk.add(doc); + dayAgoCopy += 1000000; + i++; + } + highLevelClient().bulk(bulk, RequestOptions.DEFAULT); + final long totalDocCount = i; + + // create the job and the datafeed + Job job = buildJob(jobId); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT); + + String datafeedId = jobId + "-feed"; + DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId) + .setIndices(indexName) + .setQueryDelay(TimeValue.timeValueSeconds(1)) + .setTypes(Arrays.asList("doc")) + .setFrequency(TimeValue.timeValueSeconds(1)).build(); + machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT); + + + StartDatafeedRequest startDatafeedRequest = new StartDatafeedRequest(datafeedId); + startDatafeedRequest.setStart(String.valueOf(oneDayAgo)); + // Should only process two documents + startDatafeedRequest.setEnd(String.valueOf(oneDayAgo + 2000000)); + StartDatafeedResponse response = execute(startDatafeedRequest, + machineLearningClient::startDatafeed, + machineLearningClient::startDatafeedAsync); + + assertTrue(response.isStarted()); + + assertBusy(() -> { + JobStats stats = machineLearningClient.getJobStats(new GetJobStatsRequest(jobId), RequestOptions.DEFAULT).jobStats().get(0); + assertEquals(2L, stats.getDataCounts().getInputRecordCount()); + assertEquals(JobState.CLOSED, stats.getState()); + }, 30, TimeUnit.SECONDS); + + machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT); + StartDatafeedRequest wholeDataFeed = new StartDatafeedRequest(datafeedId); + // Process all documents and end the stream + wholeDataFeed.setEnd(String.valueOf(now)); + StartDatafeedResponse wholeResponse = execute(wholeDataFeed, + machineLearningClient::startDatafeed, + machineLearningClient::startDatafeedAsync); + assertTrue(wholeResponse.isStarted()); + + assertBusy(() -> { + JobStats stats = machineLearningClient.getJobStats(new GetJobStatsRequest(jobId), RequestOptions.DEFAULT).jobStats().get(0); + assertEquals(totalDocCount, stats.getDataCounts().getInputRecordCount()); + assertEquals(JobState.CLOSED, stats.getState()); + }, 30, TimeUnit.SECONDS); + } + public void testDeleteForecast() throws Exception { String jobId = "test-delete-forecast"; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 36d5a08d6d3..c561f159475 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.documentation; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -70,6 +71,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest; import org.elasticsearch.client.ml.PutDatafeedResponse; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.PutJobResponse; +import org.elasticsearch.client.ml.StartDatafeedRequest; +import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.calendars.Calendar; import org.elasticsearch.client.ml.datafeed.ChunkingConfig; @@ -703,6 +706,69 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } } + public void testStartDatafeed() throws Exception { + RestHighLevelClient client = highLevelClient(); + + Job job = MachineLearningIT.buildJob("start-datafeed-job"); + client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + String datafeedId = job.getId() + "-feed"; + String indexName = "start_data_2"; + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long"); + highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); + DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, job.getId()) + .setTypes(Arrays.asList("doc")) + .setIndices(indexName) + .build(); + client.machineLearning().putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT); + client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT); + { + //tag::x-pack-ml-start-datafeed-request + StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); // <1> + //end::x-pack-ml-start-datafeed-request + + //tag::x-pack-ml-start-datafeed-request-options + request.setEnd("2018-08-21T00:00:00Z"); // <1> + request.setStart("2018-08-20T00:00:00Z"); // <2> + request.setTimeout(TimeValue.timeValueMinutes(10)); // <3> + //end::x-pack-ml-start-datafeed-request-options + + //tag::x-pack-ml-start-datafeed-execute + StartDatafeedResponse response = client.machineLearning().startDatafeed(request, RequestOptions.DEFAULT); + boolean started = response.isStarted(); // <1> + //end::x-pack-ml-start-datafeed-execute + + assertTrue(started); + } + { + StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); + + // tag::x-pack-ml-start-datafeed-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(StartDatafeedResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::x-pack-ml-start-datafeed-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::x-pack-ml-start-datafeed-execute-async + client.machineLearning().startDatafeedAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::x-pack-ml-start-datafeed-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGetBuckets() throws IOException, InterruptedException { RestHighLevelClient client = highLevelClient(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedRequestTests.java new file mode 100644 index 00000000000..fb83f5659cd --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedRequestTests.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class StartDatafeedRequestTests extends AbstractXContentTestCase { + + public static StartDatafeedRequest createRandomInstance(String datafeedId) { + StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); + + if (randomBoolean()) { + request.setStart(String.valueOf(randomLongBetween(1, 1000))); + } + if (randomBoolean()) { + request.setEnd(String.valueOf(randomLongBetween(1, 1000))); + } + if (randomBoolean()) { + request.setTimeout(TimeValue.timeValueMinutes(randomLongBetween(1, 1000))); + } + + return request; + } + + @Override + protected StartDatafeedRequest createTestInstance() { + String datafeedId = DatafeedConfigTests.randomValidDatafeedId(); + return createRandomInstance(datafeedId); + } + + @Override + protected StartDatafeedRequest doParseInstance(XContentParser parser) throws IOException { + return StartDatafeedRequest.PARSER.parse(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java new file mode 100644 index 00000000000..57bc75121d4 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class StartDatafeedResponseTests extends AbstractXContentTestCase { + + @Override + protected StartDatafeedResponse createTestInstance() { + return new StartDatafeedResponse(randomBoolean()); + } + + @Override + protected StartDatafeedResponse doParseInstance(XContentParser parser) throws IOException { + return StartDatafeedResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/docs/java-rest/high-level/ml/start-datafeed.asciidoc b/docs/java-rest/high-level/ml/start-datafeed.asciidoc new file mode 100644 index 00000000000..6bef621562e --- /dev/null +++ b/docs/java-rest/high-level/ml/start-datafeed.asciidoc @@ -0,0 +1,71 @@ +[[java-rest-high-x-pack-ml-start-datafeed]] +=== Start Datafeed API + +The Start Datafeed API provides the ability to start a {ml} datafeed in the cluster. +It accepts a `StartDatafeedRequest` object and responds +with a `StartDatafeedResponse` object. + +[[java-rest-high-x-pack-ml-start-datafeed-request]] +==== Start Datafeed Request + +A `StartDatafeedRequest` object is created referencing a non-null `datafeedId`. +All other fields are optional for the request. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-request] +-------------------------------------------------- +<1> Constructing a new request referencing an existing `datafeedId` + +==== Optional Arguments + +The following arguments are optional. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-request-options] +-------------------------------------------------- +<1> Set when the datafeed should end, the value is exclusive. +May be an epoch seconds, epoch millis or an ISO 8601 string. +"now" is a special value that indicates the current time. +If you do not specify an end time, the datafeed runs continuously. +<2> Set when the datafeed should start, the value is inclusive. +May be an epoch seconds, epoch millis or an ISO 8601 string. +If you do not specify a start time and the datafeed is associated with a new job, +the analysis starts from the earliest time for which data is available. +<3> Set the timeout for the request + +[[java-rest-high-x-pack-ml-start-datafeed-execution]] +==== Execution + +The request can be executed through the `MachineLearningClient` contained +in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-execute] +-------------------------------------------------- +<1> Did the datafeed successfully start? + +[[java-rest-high-x-pack-ml-start-datafeed-execution-async]] +==== Asynchronous Execution + +The request can also be executed asynchronously: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-execute-async] +-------------------------------------------------- +<1> The `StartDatafeedRequest` to execute and the `ActionListener` to use when +the execution completes + +The method does not block and returns immediately. The passed `ActionListener` is used +to notify the caller of completion. A typical `ActionListener` for `StartDatafeedResponse` may +look like + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-listener] +-------------------------------------------------- +<1> `onResponse` is called back when the action is completed successfully +<2> `onFailure` is called back when some unexpected error occurs diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index f91a2ed8e75..1a62abeb91c 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -225,6 +225,7 @@ The Java High Level REST Client supports the following Machine Learning APIs: * <> * <> * <> +* <> * <> * <> * <> @@ -247,6 +248,7 @@ include::ml/flush-job.asciidoc[] include::ml/put-datafeed.asciidoc[] include::ml/get-datafeed.asciidoc[] include::ml/delete-datafeed.asciidoc[] +include::ml/start-datafeed.asciidoc[] include::ml/get-job-stats.asciidoc[] include::ml/forecast-job.asciidoc[] include::ml/delete-forecast.asciidoc[]