[ML] Add indices_options to datafeed config and update (#52793) (#52905)

This adds a new configurable field called `indices_options`. This allows users to create or update the indices_options used when a datafeed reads from an index.

This is necessary for the following use cases:
 - Reading from frozen indices
 - Allowing certain indices in multiple index patterns to not exist yet

These index options are available on datafeed creation and update. Users may specify them as URL parameters or within the configuration object.

closes https://github.com/elastic/elasticsearch/issues/48056
This commit is contained in:
Benjamin Trent 2020-02-27 13:43:25 -05:00 committed by GitHub
parent f46b370e7a
commit eac38e9847
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 732 additions and 111 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
@ -63,6 +64,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
public static final ParseField INDICES_OPTIONS = new ParseField("indices_options");
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
@ -90,6 +92,9 @@ public class DatafeedConfig implements ToXContentObject {
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
PARSER.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)),
INDICES_OPTIONS);
}
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@ -110,11 +115,12 @@ public class DatafeedConfig implements ToXContentObject {
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -127,6 +133,7 @@ public class DatafeedConfig implements ToXContentObject {
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
}
public String getId() {
@ -177,6 +184,10 @@ public class DatafeedConfig implements ToXContentObject {
return maxEmptySearches;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -216,6 +227,11 @@ public class DatafeedConfig implements ToXContentObject {
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
if (indicesOptions != null) {
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
@ -257,7 +273,8 @@ public class DatafeedConfig implements ToXContentObject {
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}
/**
@ -268,7 +285,7 @@ public class DatafeedConfig implements ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
public static Builder builder(String id, String jobId) {
@ -289,6 +306,7 @@ public class DatafeedConfig implements ToXContentObject {
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;
public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
@ -308,6 +326,7 @@ public class DatafeedConfig implements ToXContentObject {
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
this.indicesOptions = config.indicesOptions;
}
public Builder setIndices(List<String> indices) {
@ -395,9 +414,14 @@ public class DatafeedConfig implements ToXContentObject {
return this;
}
public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -80,6 +81,9 @@ public class DatafeedUpdate implements ToXContentObject {
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
PARSER.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)),
DatafeedConfig.INDICES_OPTIONS);
}
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@ -100,11 +104,12 @@ public class DatafeedUpdate implements ToXContentObject {
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -117,6 +122,7 @@ public class DatafeedUpdate implements ToXContentObject {
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
}
/**
@ -157,6 +163,11 @@ public class DatafeedUpdate implements ToXContentObject {
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
if (indicesOptions != null) {
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}
@ -211,6 +222,10 @@ public class DatafeedUpdate implements ToXContentObject {
return maxEmptySearches;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
@ -247,7 +262,8 @@ public class DatafeedUpdate implements ToXContentObject {
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}
/**
@ -258,7 +274,7 @@ public class DatafeedUpdate implements ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
public static Builder builder(String id) {
@ -279,6 +295,7 @@ public class DatafeedUpdate implements ToXContentObject {
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;
public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
@ -297,6 +314,7 @@ public class DatafeedUpdate implements ToXContentObject {
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
this.indicesOptions = config.indicesOptions;
}
@Deprecated
@ -381,9 +399,14 @@ public class DatafeedUpdate implements ToXContentObject {
return this;
}
public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -109,6 +110,13 @@ public class DatafeedConfigTests extends AbstractXContentTestCase<DatafeedConfig
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
}
return builder;
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
@ -86,6 +87,13 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
}
return builder.build();
}

View File

@ -98,6 +98,11 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields]
(Optional, unsigned integer)
include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size]
`indices_options`::
(Optional, object)
include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options]
[[ml-put-datafeed-example]]
==== {api-examples-title}

View File

@ -101,6 +101,9 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields]
(Optional, unsigned integer)
include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size]
`indices_options`::
(Optional, object)
include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options]
[[ml-update-datafeed-example]]
==== {api-examples-title}

View File

@ -665,6 +665,19 @@ not be set to `false` on any {ml} nodes.
--
end::indices[]
tag::indices-options[]
Object specifying index expansion options used during search.
For example:
```
{
"expand_wildcards": ["all"],
"ignore_unavailable": true,
"allow_no_indices": "false",
"ignore_throttled": true
}
```
end::indices-options[]
tag::influencers[]
A comma separated list of influencer field names. Typically these can be the by,
over, or partition fields that are used in the detector configuration. You might

View File

@ -9,6 +9,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
@ -33,8 +34,11 @@ public class PutDatafeedAction extends ActionType<PutDatafeedAction.Response> {
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
public static Request parseRequest(String datafeedId, XContentParser parser) {
public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) {
DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null);
if (datafeed.getIndicesOptions() == null) {
datafeed.setIndicesOptions(indicesOptions);
}
datafeed.setId(datafeedId);
return new Request(datafeed.build());
}

View File

@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient;
@ -150,6 +152,9 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
PARSER.declareObject(DatafeedParams::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
DatafeedConfig.INDICES_OPTIONS);
}
static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
@ -193,6 +198,11 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
jobId = in.readOptionalString();
datafeedIndices = in.readStringList();
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
indicesOptions = IndicesOptions.readIndicesOptions(in);
} else {
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
}
DatafeedParams() {
@ -204,6 +214,7 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
private TimeValue timeout = TimeValue.timeValueSeconds(20);
private List<String> datafeedIndices = Collections.emptyList();
private String jobId;
private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
public String getDatafeedId() {
@ -250,6 +261,15 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
this.datafeedIndices = datafeedIndices;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
public DatafeedParams setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, DatafeedConfig.INDICES_OPTIONS);
return this;
}
@Override
public String getWriteableName() {
return MlTasks.DATAFEED_TASK_NAME;
@ -270,6 +290,9 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
out.writeOptionalString(jobId);
out.writeStringCollection(datafeedIndices);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
indicesOptions.writeIndicesOptions(out);
}
}
@Override
@ -287,13 +310,18 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
if (datafeedIndices.isEmpty() == false) {
builder.field(INDICES.getPreferredName(), datafeedIndices);
}
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices, indicesOptions);
}
@Override
@ -310,6 +338,7 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
Objects.equals(endTime, other.endTime) &&
Objects.equals(timeout, other.timeout) &&
Objects.equals(jobId, other.jobId) &&
Objects.equals(indicesOptions, other.indicesOptions) &&
Objects.equals(datafeedIndices, other.datafeedIndices);
}
}

View File

@ -7,9 +7,11 @@ package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -31,8 +33,11 @@ public class UpdateDatafeedAction extends ActionType<PutDatafeedAction.Response>
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
public static Request parseRequest(String datafeedId, XContentParser parser) {
public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) {
DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null);
if (indicesOptions != null) {
update.setIndicesOptions(indicesOptions);
}
update.setId(datafeedId);
return new Request(update.build());
}

View File

@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -92,6 +94,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
public static final ParseField INDICES_OPTIONS = new ParseField("indices_options");
// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
@ -154,6 +157,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER,
DELAYED_DATA_CHECK_CONFIG);
parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
parser.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
INDICES_OPTIONS);
return parser;
}
@ -179,11 +185,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final Map<String, String> headers;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
QueryProvider queryProvider, AggProvider aggProvider, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) {
DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -197,6 +204,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS);
}
public DatafeedConfig(StreamInput in) throws IOException {
@ -242,6 +250,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
maxEmptySearches = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
indicesOptions = IndicesOptions.readIndicesOptions(in);
} else {
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
}
/**
@ -414,6 +427,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return maxEmptySearches;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
@ -455,6 +472,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalVInt(maxEmptySearches);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
indicesOptions.writeIndicesOptions(out);
}
}
@Override
@ -494,6 +514,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
builder.endObject();
return builder;
}
@ -527,13 +551,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
headers, delayedDataCheckConfig, maxEmptySearches);
headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
@Override
@ -607,6 +632,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private Map<String, String> headers = Collections.emptyMap();
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;
public Builder() { }
@ -630,6 +656,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.headers = new HashMap<>(config.headers);
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
this.indicesOptions = config.indicesOptions;
}
public void setId(String datafeedId) {
@ -735,6 +762,15 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
}
public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public IndicesOptions getIndicesOptions() {
return this.indicesOptions;
}
public DatafeedConfig build() {
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -749,8 +785,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
setDefaultChunkingConfig();
setDefaultQueryDelay();
if (indicesOptions == null) {
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
void validateScriptFields() {

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.ml.datafeed;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -82,6 +84,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
DelayedDataCheckConfig.STRICT_PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
PARSER.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
DatafeedConfig.INDICES_OPTIONS);
}
private final String id;
@ -96,12 +101,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
QueryProvider queryProvider, AggProvider aggProvider,
List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -114,6 +120,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
}
public DatafeedUpdate(StreamInput in) throws IOException {
@ -156,6 +163,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
} else {
maxEmptySearches = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null;
} else {
indicesOptions = null;
}
}
/**
@ -204,6 +216,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalInt(maxEmptySearches);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
if (indicesOptions != null) {
out.writeBoolean(true);
indicesOptions.writeIndicesOptions(out);
} else {
out.writeBoolean(false);
}
}
}
@Override
@ -235,7 +255,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
if (indicesOptions != null) {
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}
@ -308,6 +332,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
return maxEmptySearches;
}
public IndicesOptions getIndicesOptions() {
return indicesOptions;
}
/**
* Applies the update to the given {@link DatafeedConfig}
* @return a new {@link DatafeedConfig} that contains the update
@ -355,6 +383,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
if (maxEmptySearches != null) {
builder.setMaxEmptySearches(maxEmptySearches);
}
if (indicesOptions != null) {
builder.setIndicesOptions(indicesOptions);
}
if (headers.isEmpty() == false) {
// Adjust the request, adding security headers from the current thread context
@ -395,13 +426,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
delayedDataCheckConfig, maxEmptySearches);
delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
@Override
@ -420,7 +452,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()))
&& (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches())
|| (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null));
|| (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null))
&& (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions()));
}
public static class Builder {
@ -437,6 +470,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;
public Builder() {
}
@ -458,6 +492,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
this.indicesOptions = config.indicesOptions;
}
public Builder setId(String datafeedId) {
@ -535,9 +570,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
}
}

View File

@ -287,6 +287,7 @@ public final class ReservedFieldNames {
DatafeedConfig.CHUNKING_CONFIG.getPreferredName(),
DatafeedConfig.HEADERS.getPreferredName(),
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(),
DatafeedConfig.INDICES_OPTIONS.getPreferredName(),
DelayedDataCheckConfig.ENABLED.getPreferredName(),
DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(),

View File

@ -290,6 +290,10 @@
"indices" : {
"type" : "keyword"
},
"indices_options": {
"type" : "object",
"enabled" : false
},
"job_id" : {
"type" : "keyword"
},

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
@ -13,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction.Request;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
import org.junit.Before;
@ -30,9 +30,7 @@ public class PutDatafeedActionRequestTests extends AbstractSerializingTestCase<R
@Override
protected Request createTestInstance() {
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, randomAlphaOfLength(10));
datafeedConfig.setIndices(Collections.singletonList(randomAlphaOfLength(10)));
return new Request(datafeedConfig.build());
return new Request(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAlphaOfLength(10), datafeedId, 3600));
}
@Override
@ -47,7 +45,7 @@ public class PutDatafeedActionRequestTests extends AbstractSerializingTestCase<R
@Override
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(datafeedId, parser);
return Request.parseRequest(datafeedId, SearchRequest.DEFAULT_INDICES_OPTIONS, parser);
}
@Override

View File

@ -45,7 +45,7 @@ public class UpdateDatafeedActionRequestTests extends AbstractSerializingTestCas
@Override
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(datafeedId, parser);
return Request.parseRequest(datafeedId, null, parser);
}
@Override

View File

@ -9,6 +9,8 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
@ -57,6 +59,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.xpack.core.ml.utils.QueryProviderTests.createRandomValidQueryProvider;
@ -99,11 +102,15 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
}
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
return createRandomizedDatafeedConfig(jobId, randomValidDatafeedId(), bucketSpanMillis);
}
private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, String datafeedId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, datafeedId, bucketSpanMillis).build();
}
private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, String datafeedId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setIndices(randomStringList(1, 10));
if (randomBoolean()) {
builder.setQueryProvider(createRandomValidQueryProvider(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
@ -154,6 +161,12 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
builder.setIndicesOptions(IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS));
return builder;
}
@ -340,7 +353,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
}
public void testToXContentForInternalStorage() throws IOException {
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", randomValidDatafeedId(), 300);
// headers are only persisted to cluster state
Map<String, String> headers = new HashMap<>();
@ -843,7 +856,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
@Override
protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance);
switch (between(0, 10)) {
switch (between(0, 11)) {
case 0:
builder.setId(instance.getId() + randomValidDatafeedId());
break;
@ -912,6 +925,14 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 1);
}
break;
case 11:
builder.setIndicesOptions(IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(instance.getIndicesOptions().ignoreUnavailable() == false),
Boolean.toString(instance.getIndicesOptions().allowNoIndices() == false),
Boolean.toString(instance.getIndicesOptions().ignoreThrottled() == false),
SearchRequest.DEFAULT_INDICES_OPTIONS));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.core.ml.datafeed;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
@ -46,6 +48,7 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import static org.elasticsearch.xpack.core.ml.datafeed.AggProviderTests.createRandomValidAggProvider;
import static org.elasticsearch.xpack.core.ml.utils.QueryProviderTests.createRandomValidQueryProvider;
@ -124,6 +127,14 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
if (randomBoolean()) {
builder.setMaxEmptySearches(randomBoolean() ? -1 : randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS));
}
return builder.build();
}
@ -271,6 +282,16 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getAggregations(), equalTo(aggProvider.getAggs()));
}
public void testApply_givenIndicesOptions() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId())
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN)
.build()
.apply(datafeed, Collections.emptyMap());
assertThat(datafeed.getIndicesOptions(), is(not(equalTo(updatedDatafeed.getIndicesOptions()))));
assertThat(updatedDatafeed.getIndicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN));
}
public void testApply_GivenRandomUpdates_AssertImmutability() {
for (int i = 0; i < 100; ++i) {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId());
@ -342,7 +363,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
@Override
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
switch (between(0, 10)) {
switch (between(0, 11)) {
case 0:
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
break;
@ -423,6 +444,23 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 100);
}
break;
case 11:
if (instance.getIndicesOptions() != null) {
builder.setIndicesOptions(IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(instance.getIndicesOptions().ignoreUnavailable() == false),
Boolean.toString(instance.getIndicesOptions().allowNoIndices() == false),
Boolean.toString(instance.getIndicesOptions().ignoreThrottled() == false),
SearchRequest.DEFAULT_INDICES_OPTIONS));
} else {
builder.setIndicesOptions(IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS));
}
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

View File

@ -389,6 +389,75 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testLookbackWithIndicesOptions() throws Exception {
String jobId = "test-lookback-only-with-indices-options";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"custom indices options\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"15m\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"count\"\n"
+ " }\n"
+ " ]\n"
+ " },"
+ " \"data_description\": {\"time_field\": \"time\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "*hidden-*")
.setIndicesOptions("{" +
"\"expand_wildcards\": [\"all\"]," +
"\"allow_no_indices\": true"+
"}")
.build();
StringBuilder bulk = new StringBuilder();
Request createGeoData = new Request("PUT", "/.hidden-index");
createGeoData.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"properties\": {"
+ " \"time\": { \"type\":\"date\"},"
+ " \"value\": { \"type\":\"long\"}"
+ " }"
+ " }, \"settings\": {\"index.hidden\": true} "
+ "}");
client().performRequest(createGeoData);
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 1}}\n");
bulk.append("{\"time\":\"2016-06-01T00:00:00Z\",\"value\": 1000}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 2}}\n");
bulk.append("{\"time\":\"2016-06-01T00:05:00Z\",\"value\":1500}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 3}}\n");
bulk.append("{\"time\":\"2016-06-01T00:10:00Z\",\"value\":1600}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 4}}\n");
bulk.append("{\"time\":\"2016-06-01T00:15:00Z\",\"value\":100}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 5}}\n");
bulk.append("{\"time\":\"2016-06-01T00:20:00Z\",\"value\":1}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 6}}\n");
bulk.append("{\"time\":\"2016-06-01T00:25:00Z\",\"value\":1500}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 7}}\n");
bulk.append("{\"time\":\"2016-06-01T00:30:00Z\",\"value\":1500}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 8}}\n");
bulk.append("{\"time\":\"2016-06-01T00:40:00Z\",\"value\":2100}\n");
bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 9}}\n");
bulk.append("{\"time\":\"2016-06-01T00:41:00Z\",\"value\":0}\n");
bulkIndex(bulk.toString());
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest(
new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":9"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":9"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testLookbackOnlyGivenEmptyIndex() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-given-empty-index", "airline-data-empty")
.setShouldSucceedInput(false).setShouldSucceedProcessing(false).execute();
@ -1229,6 +1298,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
String aggregations;
String authHeader = BASIC_AUTH_VALUE_SUPER_USER;
String chunkingTimespan;
String indicesOptions;
DatafeedBuilder(String datafeedId, String jobId, String index) {
this.datafeedId = datafeedId;
@ -1261,6 +1331,11 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
return this;
}
DatafeedBuilder setIndicesOptions(String indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
Response build() throws IOException {
Request request = new Request("PUT", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
request.setJsonEntity("{"
@ -1268,6 +1343,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ (source ? ",\"_source\":true" : "")
+ (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields)
+ (aggregations == null ? "" : ",\"aggs\":" + aggregations)
+ (indicesOptions == null ? "" : ",\"indices_options\":" + indicesOptions)
+ (chunkingTimespan == null ? "" :
",\"chunking_config\":{\"mode\":\"MANUAL\",\"time_span\":\"" + chunkingTimespan + "\"}")
+ "}");

View File

@ -242,6 +242,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
DatafeedConfig datafeedConfig = datafeedBuilder.build();
params.setDatafeedIndices(datafeedConfig.getIndices());
params.setJobId(datafeedConfig.getJobId());
params.setIndicesOptions(datafeedConfig.getIndicesOptions());
datafeedConfigHolder.set(datafeedConfig);
jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener);
} catch (Exception e) {
@ -377,12 +378,17 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params,
ClusterState clusterState) {
return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(),
params.getDatafeedIndices()).selectNode();
params.getDatafeedIndices(), params.getIndicesOptions()).selectNode();
}
@Override
public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) {
new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), params.getDatafeedIndices())
new DatafeedNodeSelector(clusterState,
resolver,
params.getDatafeedId(),
params.getJobId(),
params.getDatafeedIndices(),
params.getIndicesOptions())
.checkDatafeedTaskCanBeCreated();
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
@ -37,9 +38,10 @@ public class DatafeedNodeSelector {
private final PersistentTasksCustomMetaData.PersistentTask<?> jobTask;
private final ClusterState clusterState;
private final IndexNameExpressionResolver resolver;
private final IndicesOptions indicesOptions;
public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId,
String jobId, List<String> datafeedIndices) {
String jobId, List<String> datafeedIndices, IndicesOptions indicesOptions) {
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
this.datafeedId = datafeedId;
this.jobId = jobId;
@ -47,6 +49,7 @@ public class DatafeedNodeSelector {
this.jobTask = MlTasks.getJobTask(jobId, tasks);
this.clusterState = Objects.requireNonNull(clusterState);
this.resolver = Objects.requireNonNull(resolver);
this.indicesOptions = Objects.requireNonNull(indicesOptions);
}
public void checkDatafeedTaskCanBeCreated() {
@ -117,25 +120,28 @@ public class DatafeedNodeSelector {
}
String[] concreteIndices;
String reason = "cannot start datafeed [" + datafeedId + "] because index ["
+ index + "] does not exist, is closed, or is still initializing.";
try {
concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index);
concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index);
if (concreteIndices.length == 0) {
return new AssignmentFailure(reason, true);
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
+ index + "] does not exist, is closed, or is still initializing.", true);
}
} catch (Exception e) {
LOGGER.debug(reason, e);
return new AssignmentFailure(reason, true);
String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]",
index,
indicesOptions).getFormattedMessage();
LOGGER.debug("[" + datafeedId + "] " + msg, e);
return new AssignmentFailure(
"cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]",
true);
}
for (String concreteIndex : concreteIndices) {
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
reason = "cannot start datafeed [" + datafeedId + "] because index ["
+ concreteIndex + "] does not have all primary shards active yet.";
return new AssignmentFailure(reason, false);
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
+ concreteIndex + "] does not have all primary shards active yet.", false);
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
@ -27,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -46,15 +48,17 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector {
private final String jobId;
private final QueryBuilder datafeedQuery;
private final String[] datafeedIndices;
private final IndicesOptions indicesOptions;
DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery,
String[] datafeedIndices, Client client) {
String[] datafeedIndices, IndicesOptions indicesOptions, Client client) {
this.bucketSpan = bucketSpan;
this.window = window;
this.jobId = jobId;
this.timeField = timeField;
this.datafeedQuery = datafeedQuery;
this.datafeedIndices = datafeedIndices;
this.indicesOptions = Objects.requireNonNull(indicesOptions);
this.client = client;
}
@ -115,7 +119,7 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector {
.fixedInterval(new DateHistogramInterval(bucketSpan + "ms")).field(timeField))
.query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end));
SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder);
SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder).indicesOptions(indicesOptions);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
List<? extends Histogram.Bucket> buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets();

View File

@ -51,6 +51,7 @@ public class DelayedDataDetectorFactory {
job.getDataDescription().getTimeField(),
datafeedConfig.getParsedQuery(xContentRegistry),
datafeedConfig.getIndices().toArray(new String[0]),
datafeedConfig.getIndicesOptions(),
client);
} else {
return new NullDelayedDataDetector();

View File

@ -79,7 +79,7 @@ public interface DataExtractorFactory {
client,
ClientHelper.ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])),
new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]), datafeed.getIndicesOptions()),
getRollupIndexCapsActionHandler);
}
}

View File

@ -28,6 +28,7 @@ class AggregationDataExtractor extends AbstractAggregationDataExtractor<SearchRe
protected SearchRequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setSource(searchSourceBuilder)
.setIndicesOptions(context.indicesOptions)
.setIndices(context.indices);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -25,10 +26,11 @@ class AggregationDataExtractorContext {
final long end;
final boolean includeDocCount;
final Map<String, String> headers;
final IndicesOptions indicesOptions;
AggregationDataExtractorContext(String jobId, String timeField, Set<String> fields, List<String> indices, QueryBuilder query,
AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount,
Map<String, String> headers) {
Map<String, String> headers, IndicesOptions indicesOptions) {
this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField);
this.fields = Objects.requireNonNull(fields);
@ -39,5 +41,6 @@ class AggregationDataExtractorContext {
this.end = end;
this.includeDocCount = includeDocCount;
this.headers = headers;
this.indicesOptions = Objects.requireNonNull(indicesOptions);
}
}

View File

@ -50,7 +50,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
Intervals.alignToCeil(start, histogramInterval),
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
datafeedConfig.getHeaders());
datafeedConfig.getHeaders(),
datafeedConfig.getIndicesOptions());
return new AggregationDataExtractor(client, dataExtractorContext, timingStatsReporter);
}
}

View File

@ -26,7 +26,9 @@ class RollupDataExtractor extends AbstractAggregationDataExtractor<RollupSearchA
@Override
protected RollupSearchAction.RequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) {
SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(searchSourceBuilder);
SearchRequest searchRequest = new SearchRequest().indices(context.indices)
.indicesOptions(context.indicesOptions)
.source(searchSourceBuilder);
return new RollupSearchAction.RequestBuilder(client, searchRequest);
}

View File

@ -74,7 +74,8 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
Intervals.alignToCeil(start, histogramInterval),
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
datafeedConfig.getHeaders());
datafeedConfig.getHeaders(),
datafeedConfig.getIndicesOptions());
return new RollupDataExtractor(client, dataExtractorContext, timingStatsReporter);
}

View File

@ -251,12 +251,15 @@ public class ChunkedDataExtractor implements DataExtractor {
private SearchRequestBuilder rangeSearchRequest() {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setIndicesOptions(context.indicesOptions)
.setSource(rangeSearchBuilder())
.setTrackTotalHits(true);
}
private RollupSearchAction.RequestBuilder rollupRangeSearchRequest() {
SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(rangeSearchBuilder());
SearchRequest searchRequest = new SearchRequest().indices(context.indices)
.indicesOptions(context.indicesOptions)
.source(rangeSearchBuilder());
return new RollupSearchAction.RequestBuilder(client, searchRequest);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
@ -32,10 +33,11 @@ class ChunkedDataExtractorContext {
final Map<String, String> headers;
final boolean hasAggregations;
final Long histogramInterval;
final IndicesOptions indicesOptions;
ChunkedDataExtractorContext(String jobId, String timeField, List<String> indices, QueryBuilder query, int scrollSize, long start,
long end, @Nullable TimeValue chunkSpan, TimeAligner timeAligner, Map<String, String> headers,
boolean hasAggregations, @Nullable Long histogramInterval) {
boolean hasAggregations, @Nullable Long histogramInterval, IndicesOptions indicesOptions) {
this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField);
this.indices = indices.toArray(new String[indices.size()]);
@ -48,5 +50,6 @@ class ChunkedDataExtractorContext {
this.headers = headers;
this.hasAggregations = hasAggregations;
this.histogramInterval = histogramInterval;
this.indicesOptions = Objects.requireNonNull(indicesOptions);
}
}

View File

@ -54,7 +54,8 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
timeAligner,
datafeedConfig.getHeaders(),
datafeedConfig.hasAggregations(),
datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null
datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null,
datafeedConfig.getIndicesOptions()
);
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext, timingStatsReporter);
}

View File

@ -125,6 +125,7 @@ class ScrollDataExtractor implements DataExtractor {
.setScroll(SCROLL_TIMEOUT)
.addSort(context.extractedFields.timeField(), SortOrder.ASC)
.setIndices(context.indices)
.setIndicesOptions(context.indicesOptions)
.setSize(context.scrollSize)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(
context.query, context.extractedFields.timeField(), start, context.end));

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -23,10 +24,11 @@ class ScrollDataExtractorContext {
final long start;
final long end;
final Map<String, String> headers;
final IndicesOptions indicesOptions;
ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List<String> indices, QueryBuilder query,
List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize, long start, long end,
Map<String, String> headers) {
Map<String, String> headers, IndicesOptions indicesOptions) {
this.jobId = Objects.requireNonNull(jobId);
this.extractedFields = Objects.requireNonNull(extractedFields);
this.indices = indices.toArray(new String[indices.size()]);
@ -36,5 +38,6 @@ class ScrollDataExtractorContext {
this.start = start;
this.end = end;
this.headers = headers;
this.indicesOptions = Objects.requireNonNull(indicesOptions);
}
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.util.Objects;
public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final DatafeedConfig datafeedConfig;
private final Job job;
@ -54,7 +53,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
datafeedConfig.getScrollSize(),
start,
end,
datafeedConfig.getHeaders());
datafeedConfig.getHeaders(),
datafeedConfig.getIndicesOptions()
);
return new ScrollDataExtractor(client, dataExtractorContext, timingStatsReporter);
}
@ -87,11 +88,13 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
// Step 1. Get field capabilities necessary to build the information of how to extract fields
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()]));
fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions());
// We need capabilities for all fields matching the requested fields' parents so that we can work around
// multi-fields that are not in source.
String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*")
.toArray(size -> new String[size]);
String[] requestFields = job.allInputFields()
.stream()
.map(f -> MlStrings.getParentField(f) + "*")
.toArray(String[]::new);
fieldCapabilitiesRequest.fields(requestFields);
ClientHelper.<FieldCapabilitiesResponse> executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
@ -44,8 +46,9 @@ public class RestPutDatafeedAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS);
XContentParser parser = restRequest.contentParser();
PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, parser);
PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser);
putDatafeedRequest.timeout(restRequest.paramAsTime("timeout", putDatafeedRequest.timeout()));
putDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putDatafeedRequest.masterNodeTimeout()));
return channel -> client.execute(PutDatafeedAction.INSTANCE, putDatafeedRequest, new RestToXContentListener<>(channel));

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
@ -44,8 +46,15 @@ public class RestUpdateDatafeedAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
IndicesOptions indicesOptions = null;
if (restRequest.hasParam("expand_wildcards")
|| restRequest.hasParam("ignore_unavailable")
|| restRequest.hasParam("allow_no_indices")
|| restRequest.hasParam("ignore_throttled")) {
indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS);
}
XContentParser parser = restRequest.contentParser();
UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser);
UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser);
updateDatafeedRequest.timeout(restRequest.paramAsTime("timeout", updateDatafeedRequest.timeout()));
updateDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", updateDatafeedRequest.masterNodeTimeout()));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -75,10 +76,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertEquals("node_id", result.getExecutorNode());
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testSelectNode_GivenJobIsOpening() {
@ -91,10 +101,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertEquals("node_id", result.getExecutorNode());
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testNoJobTask() {
@ -107,15 +126,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " +
"[closed] while state [opened] is required"));
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]"));
}
@ -131,15 +158,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState +
"] while state [opened] is required", result.getExplanation());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState
+ "] while state [opened] is required]"));
@ -160,13 +195,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0, states);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
"does not have all primary shards active yet."));
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testShardNotAllActive() {
@ -185,13 +229,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 2, 0, states);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " +
"does not have all primary shards active yet."));
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testIndexDoesntExist() {
@ -204,17 +257,32 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " +
"does not exist, is closed, or is still initializing."));
assertThat(result.getExplanation(),
equalTo("cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " +
"indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " +
"expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " +
"forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]"));
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
+ "[cannot start datafeed [datafeed_id] because it failed resolving " +
"indices given [not_foo] and indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, " +
"expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, " +
"allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] " +
"with exception [no such index [not_foo]]]"));
}
public void testRemoteIndex() {
@ -227,8 +295,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNotNull(result.getExecutorNode());
}
@ -245,15 +317,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale",
result.getExplanation());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]"));
@ -261,9 +341,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
givenClusterState("foo", 1, 0);
result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertEquals("node_id1", result.getExecutorNode());
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated();
new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated();
}
public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() {
@ -280,10 +370,17 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
+ "[cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " +
"indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " +
"expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " +
"forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]]"));
}
public void testSelectNode_GivenMlUpgradeMode() {
@ -297,8 +394,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode();
assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE));
}
@ -314,8 +415,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
givenClusterState("foo", 1, 0);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
() -> new DatafeedNodeSelector(clusterState,
resolver,
df.getId(),
df.getJobId(),
df.getIndices(),
SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded"));
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -272,7 +273,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
private AggregationDataExtractorContext createContext(long start, long end) {
return new AggregationDataExtractorContext(jobId, timeField, fields, indices, query, aggs, start, end, true,
Collections.emptyMap());
Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS);
}
@SuppressWarnings("unchecked")

View File

@ -565,7 +565,8 @@ public class ChunkedDataExtractorTests extends ESTestCase {
private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) {
return new ChunkedDataExtractorContext(jobId, timeField, indices, query, scrollSize, start, end, chunkSpan,
ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval);
ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval,
SearchRequest.DEFAULT_INDICES_OPTIONS);
}
private static class StubSubExtractor implements DataExtractor {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -444,7 +445,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
List<SearchSourceBuilder.ScriptField> sFields = Arrays.asList(withoutSplit, withSplit);
ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices,
query, sFields, scrollSize, 1000, 2000, Collections.emptyMap());
query, sFields, scrollSize, 1000, 2000, Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS);
TestDataExtractor extractor = new TestDataExtractor(context);
@ -490,7 +491,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
private ScrollDataExtractorContext createContext(long start, long end) {
return new ScrollDataExtractorContext(jobId, extractedFields, indices, query, scriptFields, scrollSize, start, end,
Collections.emptyMap());
Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS);
}
private SearchResponse createEmptySearchResponse() {

View File

@ -23,6 +23,31 @@
"body":{
"description":"The datafeed config",
"required":true
},
"params": {
"ignore_unavailable":{
"type":"boolean",
"description":"Ignore unavailable indexes (default: false)"
},
"allow_no_indices":{
"type":"boolean",
"description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)"
},
"ignore_throttled":{
"type":"boolean",
"description":"Ignore indices that are marked as throttled (default: true)"
},
"expand_wildcards":{
"type":"enum",
"options":[
"open",
"closed",
"hidden",
"none",
"all"
],
"description":"Whether source index expressions should get expanded to open or closed indices (default: open)"
}
}
}
}

View File

@ -23,6 +23,31 @@
"body":{
"description":"The datafeed update settings",
"required":true
},
"params": {
"ignore_unavailable":{
"type":"boolean",
"description":"Ignore unavailable indexes (default: false)"
},
"allow_no_indices":{
"type":"boolean",
"description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)"
},
"ignore_throttled":{
"type":"boolean",
"description":"Ignore indices that are marked as throttled (default: true)"
},
"expand_wildcards":{
"type":"enum",
"options":[
"open",
"closed",
"hidden",
"none",
"all"
],
"description":"Whether source index expressions should get expanded to open or closed indices (default: open)"
}
}
}
}

View File

@ -415,3 +415,78 @@ setup:
datafeed_id: test-datafeed-1
force: true
- match: { acknowledged: true }
---
"Test put and update datafeed with indices options":
- do:
ml.put_datafeed:
datafeed_id: test-datafeed-indices-options-1
body: >
{
"job_id":"datafeeds-crud-1",
"indexes":["index-foo"],
"indices_options": {
"expand_wildcards": ["closed", "open"],
"ignore_throttled": false
}
}
- match: { datafeed_id: "test-datafeed-indices-options-1" }
- do:
ml.get_datafeeds:
datafeed_id: test-datafeed-indices-options-1
- match: { datafeeds.0.indices_options.ignore_throttled: false }
- length: { datafeeds.0.indices_options.expand_wildcards: 2 }
- match: { datafeeds.0.indices_options.expand_wildcards.0: open }
- match: { datafeeds.0.indices_options.expand_wildcards.1: closed }
- do:
ml.update_datafeed:
datafeed_id: test-datafeed-indices-options-1
body: >
{
"indices_options": {
"ignore_throttled": true
}
}
- match: { datafeed_id: "test-datafeed-indices-options-1" }
- do:
ml.get_datafeeds:
datafeed_id: test-datafeed-indices-options-1
- match: { datafeeds.0.indices_options.ignore_throttled: true }
---
"Test put and update datafeed with indices options in params":
- do:
ml.put_datafeed:
datafeed_id: test-datafeed-indices-options-params-1
ignore_throttled: false
body: >
{
"job_id":"datafeeds-crud-1",
"indexes":["index-foo"]
}
- match: { datafeed_id: "test-datafeed-indices-options-params-1" }
- do:
ml.get_datafeeds:
datafeed_id: test-datafeed-indices-options-params-1
- match: { datafeeds.0.indices_options.ignore_throttled: false }
- do:
ml.update_datafeed:
datafeed_id: test-datafeed-indices-options-params-1
ignore_throttled: false
allow_no_indices: false
body: >
{
}
- match: { datafeed_id: "test-datafeed-indices-options-params-1" }
- do:
ml.get_datafeeds:
datafeed_id: test-datafeed-indices-options-params-1
- match: { datafeeds.0.indices_options.ignore_throttled: false }
- match: { datafeeds.0.indices_options.allow_no_indices: false }

View File

@ -2,7 +2,6 @@ setup:
- skip:
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258"
---
"Put job and datafeed without aggs in old cluster":

View File

@ -2,7 +2,6 @@ setup:
- skip:
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258"
- do:
cluster.health:
wait_for_status: green