diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
index 9504c394c69..09c587cf81f 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java
@@ -41,6 +41,7 @@ import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PostDataRequest;
+import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.common.Strings;
@@ -182,6 +183,18 @@ final class MLRequestConverters {
return request;
}
+ static Request putDatafeed(PutDatafeedRequest putDatafeedRequest) throws IOException {
+ String endpoint = new EndpointBuilder()
+ .addPathPartAsIs("_xpack")
+ .addPathPartAsIs("ml")
+ .addPathPartAsIs("datafeeds")
+ .addPathPart(putDatafeedRequest.getDatafeed().getId())
+ .build();
+ Request request = new Request(HttpPut.METHOD_NAME, endpoint);
+ request.setEntity(createEntity(putDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
+ return request;
+ }
+
static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
index d42d2b58d44..79f9267c94d 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java
@@ -20,18 +20,15 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.ml.DeleteForecastRequest;
-import org.elasticsearch.client.ml.ForecastJobRequest;
-import org.elasticsearch.client.ml.ForecastJobResponse;
-import org.elasticsearch.client.ml.PostDataRequest;
-import org.elasticsearch.client.ml.PostDataResponse;
-import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
+import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
+import org.elasticsearch.client.ml.ForecastJobRequest;
+import org.elasticsearch.client.ml.ForecastJobResponse;
import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetBucketsResponse;
import org.elasticsearch.client.ml.GetCategoriesRequest;
@@ -48,13 +45,19 @@ import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.GetRecordsResponse;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.OpenJobResponse;
+import org.elasticsearch.client.ml.PostDataRequest;
+import org.elasticsearch.client.ml.PostDataResponse;
+import org.elasticsearch.client.ml.PutDatafeedRequest;
+import org.elasticsearch.client.ml.PutDatafeedResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutJobResponse;
+import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.job.stats.JobStats;
import java.io.IOException;
import java.util.Collections;
+
/**
* Machine Learning API client wrapper for the {@link RestHighLevelClient}
*
@@ -451,6 +454,44 @@ public final class MachineLearningClient {
Collections.emptySet());
}
+ /**
+ * Creates a new Machine Learning Datafeed
+ *
+ * For additional info
+ * see ML PUT datafeed documentation
+ *
+ * @param request The PutDatafeedRequest containing the {@link org.elasticsearch.client.ml.datafeed.DatafeedConfig} settings
+ * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return PutDatafeedResponse with enclosed {@link org.elasticsearch.client.ml.datafeed.DatafeedConfig} object
+ * @throws IOException when there is a serialization issue sending the request or receiving the response
+ */
+ public PutDatafeedResponse putDatafeed(PutDatafeedRequest request, RequestOptions options) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity(request,
+ MLRequestConverters::putDatafeed,
+ options,
+ PutDatafeedResponse::fromXContent,
+ Collections.emptySet());
+ }
+
+ /**
+ * Creates a new Machine Learning Datafeed asynchronously and notifies listener on completion
+ *
+ * For additional info
+ * see ML PUT datafeed documentation
+ *
+ * @param request The request containing the {@link org.elasticsearch.client.ml.datafeed.DatafeedConfig} settings
+ * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @param listener Listener to be notified upon request completion
+ */
+ public void putDatafeedAsync(PutDatafeedRequest request, RequestOptions options, ActionListener listener) {
+ restHighLevelClient.performRequestAsyncAndParseEntity(request,
+ MLRequestConverters::putDatafeed,
+ options,
+ PutDatafeedResponse::fromXContent,
+ listener,
+ Collections.emptySet());
+ }
+
/**
* Deletes Machine Learning Job Forecasts
*
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedRequest.java
new file mode 100644
index 00000000000..34cb12599a6
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Request to create a new Machine Learning Datafeed given a {@link DatafeedConfig} configuration
+ */
+public class PutDatafeedRequest extends ActionRequest implements ToXContentObject {
+
+ private final DatafeedConfig datafeed;
+
+ /**
+ * Construct a new PutDatafeedRequest
+ *
+ * @param datafeed a {@link DatafeedConfig} configuration to create
+ */
+ public PutDatafeedRequest(DatafeedConfig datafeed) {
+ this.datafeed = datafeed;
+ }
+
+ public DatafeedConfig getDatafeed() {
+ return datafeed;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return datafeed.toXContent(builder, params);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ PutDatafeedRequest request = (PutDatafeedRequest) object;
+ return Objects.equals(datafeed, request.datafeed);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(datafeed);
+ }
+
+ @Override
+ public final String toString() {
+ return Strings.toString(this);
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedResponse.java
new file mode 100644
index 00000000000..fa9862fd3b9
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PutDatafeedResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Response containing the newly created {@link DatafeedConfig}
+ */
+public class PutDatafeedResponse implements ToXContentObject {
+
+ private DatafeedConfig datafeed;
+
+ public static PutDatafeedResponse fromXContent(XContentParser parser) throws IOException {
+ return new PutDatafeedResponse(DatafeedConfig.PARSER.parse(parser, null).build());
+ }
+
+ PutDatafeedResponse(DatafeedConfig datafeed) {
+ this.datafeed = datafeed;
+ }
+
+ public DatafeedConfig getResponse() {
+ return datafeed;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ datafeed.toXContent(builder, params);
+ return builder;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ PutDatafeedResponse response = (PutDatafeedResponse) object;
+ return Objects.equals(datafeed, response.datafeed);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(datafeed);
+ }
+}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java
index 752752b1038..84deae61f8e 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java
@@ -20,36 +20,37 @@ package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
- * Datafeed configuration options pojo. Describes where to proactively pull input
- * data from.
- *
- * If a value has not been set it will be null
. Object wrappers are
- * used around integral types and booleans so they can take null
- * values.
+ * The datafeed configuration object. It specifies which indices
+ * to get the data from and offers parameters for customizing different
+ * aspects of the process.
*/
public class DatafeedConfig implements ToXContentObject {
- public static final int DEFAULT_SCROLL_SIZE = 1000;
-
public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
@@ -59,7 +60,6 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField QUERY = new ParseField("query");
public static final ParseField SCROLL_SIZE = new ParseField("scroll_size");
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
- public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
@@ -77,9 +77,8 @@ public class DatafeedConfig implements ToXContentObject {
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
PARSER.declareString((builder, val) ->
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
- PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
+ PARSER.declareField(Builder::setQuery, DatafeedConfig::parseBytes, QUERY, ObjectParser.ValueType.OBJECT);
+ PARSER.declareField(Builder::setAggregations, DatafeedConfig::parseBytes, AGGREGATIONS, ObjectParser.ValueType.OBJECT);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
List parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -91,29 +90,26 @@ public class DatafeedConfig implements ToXContentObject {
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
}
+ private static BytesReference parseBytes(XContentParser parser) throws IOException {
+ XContentBuilder contentBuilder = JsonXContent.contentBuilder();
+ contentBuilder.generator().copyCurrentStructure(parser);
+ return BytesReference.bytes(contentBuilder);
+ }
+
private final String id;
private final String jobId;
-
- /**
- * The delay before starting to query a period of time
- */
private final TimeValue queryDelay;
-
- /**
- * The frequency with which queries are executed
- */
private final TimeValue frequency;
-
private final List indices;
private final List types;
- private final QueryBuilder query;
- private final AggregatorFactories.Builder aggregations;
+ private final BytesReference query;
+ private final BytesReference aggregations;
private final List scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types,
- QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields,
+ BytesReference query, BytesReference aggregations, List scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
@@ -156,11 +152,11 @@ public class DatafeedConfig implements ToXContentObject {
return scrollSize;
}
- public QueryBuilder getQuery() {
+ public BytesReference getQuery() {
return query;
}
- public AggregatorFactories.Builder getAggregations() {
+ public BytesReference getAggregations() {
return aggregations;
}
@@ -183,11 +179,17 @@ public class DatafeedConfig implements ToXContentObject {
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
- builder.field(INDICES.getPreferredName(), indices);
- builder.field(TYPES.getPreferredName(), types);
- builder.field(QUERY.getPreferredName(), query);
+ if (indices != null) {
+ builder.field(INDICES.getPreferredName(), indices);
+ }
+ if (types != null) {
+ builder.field(TYPES.getPreferredName(), types);
+ }
+ if (query != null) {
+ builder.field(QUERY.getPreferredName(), asMap(query));
+ }
if (aggregations != null) {
- builder.field(AGGREGATIONS.getPreferredName(), aggregations);
+ builder.field(AGGREGATIONS.getPreferredName(), asMap(aggregations));
}
if (scriptFields != null) {
builder.startObject(SCRIPT_FIELDS.getPreferredName());
@@ -196,7 +198,9 @@ public class DatafeedConfig implements ToXContentObject {
}
builder.endObject();
}
- builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
+ if (scrollSize != null) {
+ builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
+ }
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
@@ -205,10 +209,18 @@ public class DatafeedConfig implements ToXContentObject {
return builder;
}
+ private static Map asMap(BytesReference bytesReference) {
+ return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
+ }
+
/**
* The lists of indices and types are compared for equality but they are not
* sorted first so this test could fail simply because the indices and types
* lists are in different orders.
+ *
+ * Also note this could be a heavy operation when a query or aggregations
+ * are set as we need to convert the bytes references into maps to correctly
+ * compare them.
*/
@Override
public boolean equals(Object other) {
@@ -228,31 +240,40 @@ public class DatafeedConfig implements ToXContentObject {
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indices, that.indices)
&& Objects.equals(this.types, that.types)
- && Objects.equals(this.query, that.query)
+ && Objects.equals(asMap(this.query), asMap(that.query))
&& Objects.equals(this.scrollSize, that.scrollSize)
- && Objects.equals(this.aggregations, that.aggregations)
+ && Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
+ /**
+ * Note this could be a heavy operation when a query or aggregations
+ * are set as we need to convert the bytes references into maps to
+ * compute a stable hash code.
+ */
@Override
public int hashCode() {
- return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
+ return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
}
+ public static Builder builder(String id, String jobId) {
+ return new Builder(id, jobId);
+ }
+
public static class Builder {
private String id;
private String jobId;
private TimeValue queryDelay;
private TimeValue frequency;
- private List indices = Collections.emptyList();
- private List types = Collections.emptyList();
- private QueryBuilder query = QueryBuilders.matchAllQuery();
- private AggregatorFactories.Builder aggregations;
+ private List indices;
+ private List types;
+ private BytesReference query;
+ private BytesReference aggregations;
private List scriptFields;
- private Integer scrollSize = DEFAULT_SCROLL_SIZE;
+ private Integer scrollSize;
private ChunkingConfig chunkingConfig;
public Builder(String id, String jobId) {
@@ -279,8 +300,12 @@ public class DatafeedConfig implements ToXContentObject {
return this;
}
+ public Builder setIndices(String... indices) {
+ return setIndices(Arrays.asList(indices));
+ }
+
public Builder setTypes(List types) {
- this.types = Objects.requireNonNull(types, TYPES.getPreferredName());
+ this.types = types;
return this;
}
@@ -294,16 +319,36 @@ public class DatafeedConfig implements ToXContentObject {
return this;
}
- public Builder setQuery(QueryBuilder query) {
- this.query = Objects.requireNonNull(query, QUERY.getPreferredName());
+ private Builder setQuery(BytesReference query) {
+ this.query = query;
return this;
}
- public Builder setAggregations(AggregatorFactories.Builder aggregations) {
+ public Builder setQuery(String queryAsJson) {
+ this.query = queryAsJson == null ? null : new BytesArray(queryAsJson);
+ return this;
+ }
+
+ public Builder setQuery(QueryBuilder query) throws IOException {
+ this.query = query == null ? null : xContentToBytes(query);
+ return this;
+ }
+
+ private Builder setAggregations(BytesReference aggregations) {
this.aggregations = aggregations;
return this;
}
+ public Builder setAggregations(String aggsAsJson) {
+ this.aggregations = aggsAsJson == null ? null : new BytesArray(aggsAsJson);
+ return this;
+ }
+
+ public Builder setAggregations(AggregatorFactories.Builder aggregations) throws IOException {
+ this.aggregations = aggregations == null ? null : xContentToBytes(aggregations);
+ return this;
+ }
+
public Builder setScriptFields(List scriptFields) {
List sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
@@ -325,5 +370,12 @@ public class DatafeedConfig implements ToXContentObject {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
}
+
+ private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
+ try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+ object.toXContent(builder, ToXContentObject.EMPTY_PARAMS);
+ return BytesReference.bytes(builder);
+ }
+ }
}
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java
index 184d5d51481..1e59ea067ca 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java
@@ -20,12 +20,17 @@ package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -35,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -58,11 +64,9 @@ public class DatafeedUpdate implements ToXContentObject {
TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY);
PARSER.declareString((builder, val) -> builder.setFrequency(
TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY);
- PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), DatafeedConfig.QUERY);
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p),
- DatafeedConfig.AGGREGATIONS);
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p),
- DatafeedConfig.AGGS);
+ PARSER.declareField(Builder::setQuery, DatafeedUpdate::parseBytes, DatafeedConfig.QUERY, ObjectParser.ValueType.OBJECT);
+ PARSER.declareField(Builder::setAggregations, DatafeedUpdate::parseBytes, DatafeedConfig.AGGREGATIONS,
+ ObjectParser.ValueType.OBJECT);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
List parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -74,20 +78,26 @@ public class DatafeedUpdate implements ToXContentObject {
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
}
+ private static BytesReference parseBytes(XContentParser parser) throws IOException {
+ XContentBuilder contentBuilder = JsonXContent.contentBuilder();
+ contentBuilder.generator().copyCurrentStructure(parser);
+ return BytesReference.bytes(contentBuilder);
+ }
+
private final String id;
private final String jobId;
private final TimeValue queryDelay;
private final TimeValue frequency;
private final List indices;
private final List types;
- private final QueryBuilder query;
- private final AggregatorFactories.Builder aggregations;
+ private final BytesReference query;
+ private final BytesReference aggregations;
private final List scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types,
- QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields,
+ BytesReference query, BytesReference aggregations, List scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
@@ -121,9 +131,13 @@ public class DatafeedUpdate implements ToXContentObject {
builder.field(DatafeedConfig.FREQUENCY.getPreferredName(), frequency.getStringRep());
}
addOptionalField(builder, DatafeedConfig.INDICES, indices);
+ if (query != null) {
+ builder.field(DatafeedConfig.QUERY.getPreferredName(), asMap(query));
+ }
+ if (aggregations != null) {
+ builder.field(DatafeedConfig.AGGREGATIONS.getPreferredName(), asMap(aggregations));
+ }
addOptionalField(builder, DatafeedConfig.TYPES, types);
- addOptionalField(builder, DatafeedConfig.QUERY, query);
- addOptionalField(builder, DatafeedConfig.AGGREGATIONS, aggregations);
if (scriptFields != null) {
builder.startObject(DatafeedConfig.SCRIPT_FIELDS.getPreferredName());
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
@@ -167,11 +181,11 @@ public class DatafeedUpdate implements ToXContentObject {
return scrollSize;
}
- public QueryBuilder getQuery() {
+ public BytesReference getQuery() {
return query;
}
- public AggregatorFactories.Builder getAggregations() {
+ public BytesReference getAggregations() {
return aggregations;
}
@@ -183,10 +197,18 @@ public class DatafeedUpdate implements ToXContentObject {
return chunkingConfig;
}
+ private static Map asMap(BytesReference bytesReference) {
+ return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
+ }
+
/**
* The lists of indices and types are compared for equality but they are not
* sorted first so this test could fail simply because the indices and types
* lists are in different orders.
+ *
+ * Also note this could be a heavy operation when a query or aggregations
+ * are set as we need to convert the bytes references into maps to correctly
+ * compare them.
*/
@Override
public boolean equals(Object other) {
@@ -206,19 +228,28 @@ public class DatafeedUpdate implements ToXContentObject {
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indices, that.indices)
&& Objects.equals(this.types, that.types)
- && Objects.equals(this.query, that.query)
+ && Objects.equals(asMap(this.query), asMap(that.query))
&& Objects.equals(this.scrollSize, that.scrollSize)
- && Objects.equals(this.aggregations, that.aggregations)
+ && Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
+ /**
+ * Note this could be a heavy operation when a query or aggregations
+ * are set as we need to convert the bytes references into maps to
+ * compute a stable hash code.
+ */
@Override
public int hashCode() {
- return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
+ return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
}
+ public static Builder builder(String id) {
+ return new Builder(id);
+ }
+
public static class Builder {
private String id;
@@ -227,8 +258,8 @@ public class DatafeedUpdate implements ToXContentObject {
private TimeValue frequency;
private List indices;
private List types;
- private QueryBuilder query;
- private AggregatorFactories.Builder aggregations;
+ private BytesReference query;
+ private BytesReference aggregations;
private List scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
@@ -276,16 +307,36 @@ public class DatafeedUpdate implements ToXContentObject {
return this;
}
- public Builder setQuery(QueryBuilder query) {
+ private Builder setQuery(BytesReference query) {
this.query = query;
return this;
}
- public Builder setAggregations(AggregatorFactories.Builder aggregations) {
+ public Builder setQuery(String queryAsJson) {
+ this.query = queryAsJson == null ? null : new BytesArray(queryAsJson);
+ return this;
+ }
+
+ public Builder setQuery(QueryBuilder query) throws IOException {
+ this.query = query == null ? null : xContentToBytes(query);
+ return this;
+ }
+
+ private Builder setAggregations(BytesReference aggregations) {
this.aggregations = aggregations;
return this;
}
+ public Builder setAggregations(String aggsAsJson) {
+ this.aggregations = aggsAsJson == null ? null : new BytesArray(aggsAsJson);
+ return this;
+ }
+
+ public Builder setAggregations(AggregatorFactories.Builder aggregations) throws IOException {
+ this.aggregations = aggregations == null ? null : xContentToBytes(aggregations);
+ return this;
+ }
+
public Builder setScriptFields(List scriptFields) {
List sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
@@ -307,5 +358,12 @@ public class DatafeedUpdate implements ToXContentObject {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
}
+
+ private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
+ try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+ object.toXContent(builder, ToXContentObject.EMPTY_PARAMS);
+ return BytesReference.bytes(builder);
+ }
+ }
}
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
index d63573b534c..19db672e35b 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java
@@ -37,8 +37,11 @@ import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PostDataRequest;
+import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
@@ -206,6 +209,20 @@ public class MLRequestConvertersTests extends ESTestCase {
}
}
+ public void testPutDatafeed() throws IOException {
+ DatafeedConfig datafeed = DatafeedConfigTests.createRandom();
+ PutDatafeedRequest putDatafeedRequest = new PutDatafeedRequest(datafeed);
+
+ Request request = MLRequestConverters.putDatafeed(putDatafeedRequest);
+
+ assertEquals(HttpPut.METHOD_NAME, request.getMethod());
+ assertThat(request.getEndpoint(), equalTo("/_xpack/ml/datafeeds/" + datafeed.getId()));
+ try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
+ DatafeedConfig parsedDatafeed = DatafeedConfig.PARSER.apply(parser, null).build();
+ assertThat(parsedDatafeed, equalTo(datafeed));
+ }
+ }
+
public void testDeleteForecast() throws Exception {
String jobId = randomAlphaOfLength(10);
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
index db680aaa95d..c0bf1055058 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java
@@ -23,34 +23,37 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.ml.DeleteForecastRequest;
-import org.elasticsearch.client.ml.ForecastJobRequest;
-import org.elasticsearch.client.ml.ForecastJobResponse;
-import org.elasticsearch.client.ml.PostDataRequest;
-import org.elasticsearch.client.ml.PostDataResponse;
-import org.elasticsearch.client.ml.UpdateJobRequest;
-import org.elasticsearch.client.ml.job.config.JobUpdate;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.client.ml.GetJobStatsRequest;
-import org.elasticsearch.client.ml.GetJobStatsResponse;
-import org.elasticsearch.client.ml.job.config.JobState;
-import org.elasticsearch.client.ml.job.stats.JobStats;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
+import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
+import org.elasticsearch.client.ml.ForecastJobRequest;
+import org.elasticsearch.client.ml.ForecastJobResponse;
import org.elasticsearch.client.ml.GetJobRequest;
import org.elasticsearch.client.ml.GetJobResponse;
+import org.elasticsearch.client.ml.GetJobStatsRequest;
+import org.elasticsearch.client.ml.GetJobStatsResponse;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.OpenJobResponse;
+import org.elasticsearch.client.ml.PostDataRequest;
+import org.elasticsearch.client.ml.PostDataResponse;
+import org.elasticsearch.client.ml.PutDatafeedRequest;
+import org.elasticsearch.client.ml.PutDatafeedResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutJobResponse;
+import org.elasticsearch.client.ml.UpdateJobRequest;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
-import org.elasticsearch.client.ml.FlushJobRequest;
-import org.elasticsearch.client.ml.FlushJobResponse;
+import org.elasticsearch.client.ml.job.config.JobState;
+import org.elasticsearch.client.ml.job.config.JobUpdate;
+import org.elasticsearch.client.ml.job.stats.JobStats;
+import org.elasticsearch.common.unit.TimeValue;
import org.junit.After;
import java.io.IOException;
@@ -292,6 +295,23 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertEquals("Updated description", getResponse.jobs().get(0).getDescription());
}
+ public void testPutDatafeed() throws Exception {
+ String jobId = randomValidJobId();
+ Job job = buildJob(jobId);
+ MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
+ execute(new PutJobRequest(job), machineLearningClient::putJob, machineLearningClient::putJobAsync);
+
+ String datafeedId = "datafeed-" + jobId;
+ DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();
+
+ PutDatafeedResponse response = execute(new PutDatafeedRequest(datafeedConfig), machineLearningClient::putDatafeed,
+ machineLearningClient::putDatafeedAsync);
+
+ DatafeedConfig createdDatafeed = response.getResponse();
+ assertThat(createdDatafeed.getId(), equalTo(datafeedId));
+ assertThat(createdDatafeed.getIndices(), equalTo(datafeedConfig.getIndices()));
+ }
+
public void testDeleteForecast() throws Exception {
String jobId = "test-delete-forecast";
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
index 2da0da0c53f..3e43792ac6a 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
@@ -59,20 +59,24 @@ import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.OpenJobResponse;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PostDataResponse;
+import org.elasticsearch.client.ml.PutDatafeedRequest;
+import org.elasticsearch.client.ml.PutDatafeedResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutJobResponse;
import org.elasticsearch.client.ml.UpdateJobRequest;
+import org.elasticsearch.client.ml.datafeed.ChunkingConfig;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.AnalysisLimits;
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.DetectionRule;
import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
-import org.elasticsearch.client.ml.job.process.DataCounts;
import org.elasticsearch.client.ml.job.config.JobUpdate;
import org.elasticsearch.client.ml.job.config.ModelPlotConfig;
import org.elasticsearch.client.ml.job.config.Operator;
import org.elasticsearch.client.ml.job.config.RuleCondition;
+import org.elasticsearch.client.ml.job.process.DataCounts;
import org.elasticsearch.client.ml.job.results.AnomalyRecord;
import org.elasticsearch.client.ml.job.results.Bucket;
import org.elasticsearch.client.ml.job.results.CategoryDefinition;
@@ -82,6 +86,9 @@ import org.elasticsearch.client.ml.job.stats.JobStats;
import org.elasticsearch.client.ml.job.util.PageParams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After;
import java.io.IOException;
@@ -97,6 +104,7 @@ import java.util.stream.Collectors;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;
@@ -189,8 +197,6 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
public void testGetJob() throws Exception {
RestHighLevelClient client = highLevelClient();
- String jobId = "get-machine-learning-job1";
-
Job job = MachineLearningIT.buildJob("get-machine-learning-job1");
client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
@@ -481,6 +487,106 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
+ public void testPutDatafeed() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+
+ {
+ // We need to create a job for the datafeed request to be valid
+ String jobId = "put-datafeed-job-1";
+ Job job = MachineLearningIT.buildJob(jobId);
+ client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+
+ String id = "datafeed-1";
+
+ //tag::x-pack-ml-create-datafeed-config
+ DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(id, jobId) // <1>
+ .setIndices("index_1", "index_2"); // <2>
+ //end::x-pack-ml-create-datafeed-config
+
+ AggregatorFactories.Builder aggs = AggregatorFactories.builder();
+
+ //tag::x-pack-ml-create-datafeed-config-set-aggregations
+ datafeedBuilder.setAggregations(aggs); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-aggregations
+
+ // Clearing aggregation to avoid complex validation rules
+ datafeedBuilder.setAggregations((String) null);
+
+ //tag::x-pack-ml-create-datafeed-config-set-chunking-config
+ datafeedBuilder.setChunkingConfig(ChunkingConfig.newAuto()); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-chunking-config
+
+ //tag::x-pack-ml-create-datafeed-config-set-frequency
+ datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(30)); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-frequency
+
+ //tag::x-pack-ml-create-datafeed-config-set-query
+ datafeedBuilder.setQuery(QueryBuilders.matchAllQuery()); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-query
+
+ //tag::x-pack-ml-create-datafeed-config-set-query-delay
+ datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-query-delay
+
+ List scriptFields = Collections.emptyList();
+ //tag::x-pack-ml-create-datafeed-config-set-script-fields
+ datafeedBuilder.setScriptFields(scriptFields); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-script-fields
+
+ //tag::x-pack-ml-create-datafeed-config-set-scroll-size
+ datafeedBuilder.setScrollSize(1000); // <1>
+ //end::x-pack-ml-create-datafeed-config-set-scroll-size
+
+ //tag::x-pack-ml-put-datafeed-request
+ PutDatafeedRequest request = new PutDatafeedRequest(datafeedBuilder.build()); // <1>
+ //end::x-pack-ml-put-datafeed-request
+
+ //tag::x-pack-ml-put-datafeed-execute
+ PutDatafeedResponse response = client.machineLearning().putDatafeed(request, RequestOptions.DEFAULT);
+ //end::x-pack-ml-put-datafeed-execute
+
+ //tag::x-pack-ml-put-datafeed-response
+ DatafeedConfig datafeed = response.getResponse(); // <1>
+ //end::x-pack-ml-put-datafeed-response
+ assertThat(datafeed.getId(), equalTo("datafeed-1"));
+ }
+ {
+ // We need to create a job for the datafeed request to be valid
+ String jobId = "put-datafeed-job-2";
+ Job job = MachineLearningIT.buildJob(jobId);
+ client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+
+ String id = "datafeed-2";
+
+ DatafeedConfig datafeed = new DatafeedConfig.Builder(id, jobId).setIndices("index_1", "index_2").build();
+
+ PutDatafeedRequest request = new PutDatafeedRequest(datafeed);
+ // tag::x-pack-ml-put-datafeed-execute-listener
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(PutDatafeedResponse response) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::x-pack-ml-put-datafeed-execute-listener
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::x-pack-ml-put-datafeed-execute-async
+ client.machineLearning().putDatafeedAsync(request, RequestOptions.DEFAULT, listener); // <1>
+ // end::x-pack-ml-put-datafeed-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+ }
+
public void testGetBuckets() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedRequestTests.java
new file mode 100644
index 00000000000..5af30d32574
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedRequestTests.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+
+public class PutDatafeedRequestTests extends AbstractXContentTestCase {
+
+ @Override
+ protected PutDatafeedRequest createTestInstance() {
+ return new PutDatafeedRequest(DatafeedConfigTests.createRandom());
+ }
+
+ @Override
+ protected PutDatafeedRequest doParseInstance(XContentParser parser) {
+ return new PutDatafeedRequest(DatafeedConfig.PARSER.apply(parser, null).build());
+ }
+
+ @Override
+ protected boolean supportsUnknownFields() {
+ return false;
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedResponseTests.java
new file mode 100644
index 00000000000..5b2428167b9
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PutDatafeedResponseTests.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public class PutDatafeedResponseTests extends AbstractXContentTestCase {
+
+ @Override
+ protected PutDatafeedResponse createTestInstance() {
+ return new PutDatafeedResponse(DatafeedConfigTests.createRandom());
+ }
+
+ @Override
+ protected PutDatafeedResponse doParseInstance(XContentParser parser) throws IOException {
+ return PutDatafeedResponse.fromXContent(parser);
+ }
+
+ @Override
+ protected boolean supportsUnknownFields() {
+ return true;
+ }
+
+ @Override
+ protected Predicate getRandomFieldsExcludeFilter() {
+ return field -> !field.isEmpty();
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java
index 8ed51415521..3a7910ad732 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java
@@ -19,7 +19,6 @@
package org.elasticsearch.client.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -27,7 +26,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
@@ -36,19 +34,26 @@ import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
public class DatafeedConfigTests extends AbstractXContentTestCase {
@Override
protected DatafeedConfig createTestInstance() {
+ return createRandom();
+ }
+
+ public static DatafeedConfig createRandom() {
long bucketSpanMillis = 3600000;
DatafeedConfig.Builder builder = constructBuilder();
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
if (randomBoolean()) {
- builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
+ try {
+ builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize query", e);
+ }
}
boolean addScriptFields = randomBoolean();
if (addScriptFields) {
@@ -72,7 +77,11 @@ public class DatafeedConfigTests extends AbstractXContentTestCase randomStringList(int min, int max) {
int size = scaledRandomIntBetween(min, max);
List list = new ArrayList<>();
@@ -150,21 +153,6 @@ public class DatafeedConfigTests extends AbstractXContentTestCase new DatafeedConfig.Builder(randomValidDatafeedId(), null));
}
- public void testCheckValid_GivenNullIndices() {
- DatafeedConfig.Builder conf = constructBuilder();
- expectThrows(NullPointerException.class, () -> conf.setIndices(null));
- }
-
- public void testCheckValid_GivenNullType() {
- DatafeedConfig.Builder conf = constructBuilder();
- expectThrows(NullPointerException.class, () -> conf.setTypes(null));
- }
-
- public void testCheckValid_GivenNullQuery() {
- DatafeedConfig.Builder conf = constructBuilder();
- expectThrows(NullPointerException.class, () -> conf.setQuery(null));
- }
-
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java
index 3dddad3c016..1c3723fd0a6 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java
@@ -18,19 +18,16 @@
*/
package org.elasticsearch.client.ml.datafeed;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.AbstractXContentTestCase;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
public class DatafeedUpdateTests extends AbstractXContentTestCase {
@@ -54,7 +51,11 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase The configuration of the {ml} datafeed to create
+
+[[java-rest-high-x-pack-ml-put-datafeed-config]]
+==== Datafeed Configuration
+
+The `DatafeedConfig` object contains all the details about the {ml} datafeed
+configuration.
+
+A `DatafeedConfig` requires the following arguments:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config]
+--------------------------------------------------
+<1> The datafeed ID and the job ID
+<2> The indices that contain the data to retrieve and feed into the job
+
+==== Optional Arguments
+The following arguments are optional:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-chunking-config]
+--------------------------------------------------
+<1> Specifies how data searches are split into time chunks.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-frequency]
+--------------------------------------------------
+<1> The interval at which scheduled queries are made while the datafeed runs in real time.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-query]
+--------------------------------------------------
+<1> A query to filter the search results by. Defaults to the `match_all` query.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-query-delay]
+--------------------------------------------------
+<1> The time interval behind real time that data is queried.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-script-fields]
+--------------------------------------------------
+<1> Allows the use of script fields.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-create-datafeed-config-set-scroll-size]
+--------------------------------------------------
+<1> The `size` parameter used in the searches.
+
+[[java-rest-high-x-pack-ml-put-datafeed-execution]]
+==== Execution
+
+The Put Datafeed API can be executed through a `MachineLearningClient`
+instance. Such an instance can be retrieved from a `RestHighLevelClient`
+using the `machineLearning()` method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-put-datafeed-execute]
+--------------------------------------------------
+
+[[java-rest-high-x-pack-ml-put-datafeed-response]]
+==== Response
+
+The returned `PutDatafeedResponse` returns the full representation of
+the new {ml} datafeed if it has been successfully created. This will
+contain the creation time and other fields initialized using
+default values:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-put-datafeed-response]
+--------------------------------------------------
+<1> The created datafeed
+
+[[java-rest-high-x-pack-ml-put-datafeed-async]]
+==== Asynchronous Execution
+
+This request can be executed asynchronously:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-put-datafeed-execute-async]
+--------------------------------------------------
+<1> The `PutDatafeedRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `PutDatafeedResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-put-datafeed-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
diff --git a/docs/java-rest/high-level/ml/put-job.asciidoc b/docs/java-rest/high-level/ml/put-job.asciidoc
index d51bb63d405..8c726d63b16 100644
--- a/docs/java-rest/high-level/ml/put-job.asciidoc
+++ b/docs/java-rest/high-level/ml/put-job.asciidoc
@@ -142,7 +142,7 @@ This request can be executed asynchronously:
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-put-job-execute-async]
--------------------------------------------------
-<1> The `PutMlJobRequest` to execute and the `ActionListener` to use when
+<1> The `PutJobRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index a6d173f6e27..0be681a14d1 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -220,6 +220,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
* <>
* <>
* <>
+* <>
* <>
* <>
* <>
@@ -236,6 +237,7 @@ include::ml/open-job.asciidoc[]
include::ml/close-job.asciidoc[]
include::ml/update-job.asciidoc[]
include::ml/flush-job.asciidoc[]
+include::ml/put-datafeed.asciidoc[]
include::ml/get-job-stats.asciidoc[]
include::ml/forecast-job.asciidoc[]
include::ml/delete-forecast.asciidoc[]