diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java index f192b420eba..0b53c2e1852 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.client.ml.datafeed; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ml.job.config.Job; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; @@ -63,6 +64,7 @@ public class DatafeedConfig implements ToXContentObject { public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); @@ -90,6 +92,9 @@ public class DatafeedConfig implements ToXContentObject { PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -110,11 +115,12 @@ public class DatafeedConfig implements ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -127,6 +133,7 @@ public class DatafeedConfig implements ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public String getId() { @@ -177,6 +184,10 @@ public class DatafeedConfig implements ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -216,6 +227,11 @@ public class DatafeedConfig implements ToXContentObject { if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + if (indicesOptions != null) { + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; @@ -257,7 +273,8 @@ public class DatafeedConfig implements ToXContentObject { && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -268,7 +285,7 @@ public class DatafeedConfig implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id, String jobId) { @@ -289,6 +306,7 @@ public class DatafeedConfig implements ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id, String jobId) { this.id = Objects.requireNonNull(id, ID.getPreferredName()); @@ -308,6 +326,7 @@ public class DatafeedConfig implements ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public Builder setIndices(List indices) { @@ -395,9 +414,14 @@ public class DatafeedConfig implements ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedConfig build() { return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java index e11e2e1d9b3..3acdda37615 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -80,6 +81,9 @@ public class DatafeedUpdate implements ToXContentObject { DelayedDataCheckConfig.PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + DatafeedConfig.INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -100,11 +104,12 @@ public class DatafeedUpdate implements ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -117,6 +122,7 @@ public class DatafeedUpdate implements ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } /** @@ -157,6 +163,11 @@ public class DatafeedUpdate implements ToXContentObject { addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -211,6 +222,10 @@ public class DatafeedUpdate implements ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + private static Map asMap(BytesReference bytesReference) { return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2(); } @@ -247,7 +262,8 @@ public class DatafeedUpdate implements ToXContentObject { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -258,7 +274,7 @@ public class DatafeedUpdate implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id) { @@ -279,6 +295,7 @@ public class DatafeedUpdate implements ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id) { this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); @@ -297,6 +314,7 @@ public class DatafeedUpdate implements ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } @Deprecated @@ -381,9 +399,14 @@ public class DatafeedUpdate implements ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java index 7f7a03ab2e1..16bd25bb492 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -109,6 +110,13 @@ public class DatafeedConfigTests extends AbstractXContentTestCase { public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) { DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null); + if (datafeed.getIndicesOptions() == null) { + datafeed.setIndicesOptions(indicesOptions); + } datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 33102cf93a9..d9676a52674 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; @@ -150,6 +152,9 @@ public class StartDatafeedAction extends ActionType { params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString(DatafeedParams::setJobId, Job.ID); PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES); + PARSER.declareObject(DatafeedParams::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) { @@ -193,6 +198,11 @@ public class StartDatafeedAction extends ActionType { jobId = in.readOptionalString(); datafeedIndices = in.readStringList(); } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } DatafeedParams() { @@ -204,6 +214,7 @@ public class StartDatafeedAction extends ActionType { private TimeValue timeout = TimeValue.timeValueSeconds(20); private List datafeedIndices = Collections.emptyList(); private String jobId; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; public String getDatafeedId() { @@ -250,6 +261,15 @@ public class StartDatafeedAction extends ActionType { this.datafeedIndices = datafeedIndices; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public DatafeedParams setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, DatafeedConfig.INDICES_OPTIONS); + return this; + } + @Override public String getWriteableName() { return MlTasks.DATAFEED_TASK_NAME; @@ -270,6 +290,9 @@ public class StartDatafeedAction extends ActionType { out.writeOptionalString(jobId); out.writeStringCollection(datafeedIndices); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -287,13 +310,18 @@ public class StartDatafeedAction extends ActionType { if (datafeedIndices.isEmpty() == false) { builder.field(INDICES.getPreferredName(), datafeedIndices); } + + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices); + return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices, indicesOptions); } @Override @@ -310,6 +338,7 @@ public class StartDatafeedAction extends ActionType { Objects.equals(endTime, other.endTime) && Objects.equals(timeout, other.timeout) && Objects.equals(jobId, other.jobId) && + Objects.equals(indicesOptions, other.indicesOptions) && Objects.equals(datafeedIndices, other.datafeedIndices); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 5d82a216626..a5aebc61df2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -31,8 +33,11 @@ public class UpdateDatafeedAction extends ActionType public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) { DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); + if (indicesOptions != null) { + update.setIndicesOptions(indicesOptions); + } update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 9c0f46e0fea..b7621ba7581 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -92,6 +94,7 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -154,6 +157,9 @@ public class DatafeedConfig extends AbstractDiffable implements ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER, DELAYED_DATA_CHECK_CONFIG); parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + parser.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + INDICES_OPTIONS); return parser; } @@ -179,11 +185,12 @@ public class DatafeedConfig extends AbstractDiffable implements private final Map headers; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, - DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) { + DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -197,6 +204,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.headers = Collections.unmodifiableMap(headers); this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS); } public DatafeedConfig(StreamInput in) throws IOException { @@ -242,6 +250,11 @@ public class DatafeedConfig extends AbstractDiffable implements } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } /** @@ -414,6 +427,10 @@ public class DatafeedConfig extends AbstractDiffable implements return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -455,6 +472,9 @@ public class DatafeedConfig extends AbstractDiffable implements if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalVInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -494,6 +514,10 @@ public class DatafeedConfig extends AbstractDiffable implements if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @@ -527,13 +551,14 @@ public class DatafeedConfig extends AbstractDiffable implements && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.headers, that.headers) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - headers, delayedDataCheckConfig, maxEmptySearches); + headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -607,6 +632,7 @@ public class DatafeedConfig extends AbstractDiffable implements private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -630,6 +656,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.headers = new HashMap<>(config.headers); this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public void setId(String datafeedId) { @@ -735,6 +762,15 @@ public class DatafeedConfig extends AbstractDiffable implements } } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public IndicesOptions getIndicesOptions() { + return this.indicesOptions; + } + public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -749,8 +785,11 @@ public class DatafeedConfig extends AbstractDiffable implements setDefaultChunkingConfig(); setDefaultQueryDelay(); + if (indicesOptions == null) { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } void validateScriptFields() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index f9b590730cd..2fedc1f127c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.ml.datafeed; import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -82,6 +84,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { DelayedDataCheckConfig.STRICT_PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } private final String id; @@ -96,12 +101,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -114,6 +120,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public DatafeedUpdate(StreamInput in) throws IOException { @@ -156,6 +163,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null; + } else { + indicesOptions = null; + } } /** @@ -204,6 +216,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + if (indicesOptions != null) { + out.writeBoolean(true); + indicesOptions.writeIndicesOptions(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -235,7 +255,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); - + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -308,6 +332,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + /** * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update @@ -355,6 +383,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (maxEmptySearches != null) { builder.setMaxEmptySearches(maxEmptySearches); } + if (indicesOptions != null) { + builder.setIndicesOptions(indicesOptions); + } if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context @@ -395,13 +426,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - delayedDataCheckConfig, maxEmptySearches); + delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -420,7 +452,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())) && (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches()) - || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)); + || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)) + && (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions())); } public static class Builder { @@ -437,6 +470,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -458,6 +492,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } public Builder setId(String datafeedId) { @@ -535,9 +570,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 890ecbb87e5..c0b915f8632 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -287,6 +287,7 @@ public final class ReservedFieldNames { DatafeedConfig.CHUNKING_CONFIG.getPreferredName(), DatafeedConfig.HEADERS.getPreferredName(), DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), + DatafeedConfig.INDICES_OPTIONS.getPreferredName(), DelayedDataCheckConfig.ENABLED.getPreferredName(), DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index aa7211cf448..ec1daf22e35 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -290,6 +290,10 @@ "indices" : { "type" : "keyword" }, + "indices_options": { + "type" : "object", + "enabled" : false + }, "job_id" : { "type" : "keyword" }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java index dfbc7e7a55a..633e54a479f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -13,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction.Request; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests; import org.junit.Before; @@ -30,9 +30,7 @@ public class PutDatafeedActionRequestTests extends AbstractSerializingTestCase headers = new HashMap<>(); @@ -843,7 +856,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; + private final IndicesOptions indicesOptions; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, - String jobId, List datafeedIndices) { + String jobId, List datafeedIndices, IndicesOptions indicesOptions) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeedId = datafeedId; this.jobId = jobId; @@ -47,6 +49,7 @@ public class DatafeedNodeSelector { this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); + this.indicesOptions = Objects.requireNonNull(indicesOptions); } public void checkDatafeedTaskCanBeCreated() { @@ -117,25 +120,28 @@ public class DatafeedNodeSelector { } String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeedId + "] because index [" - + index + "] does not exist, is closed, or is still initializing."; try { - concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); + concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index); if (concreteIndices.length == 0) { - return new AssignmentFailure(reason, true); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + index + "] does not exist, is closed, or is still initializing.", true); } } catch (Exception e) { - LOGGER.debug(reason, e); - return new AssignmentFailure(reason, true); + String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]", + index, + indicesOptions).getFormattedMessage(); + LOGGER.debug("[" + datafeedId + "] " + msg, e); + return new AssignmentFailure( + "cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]", + true); } for (String concreteIndex : concreteIndices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeedId + "] because index [" - + concreteIndex + "] does not have all primary shards active yet."; - return new AssignmentFailure(reason, false); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + concreteIndex + "] does not have all primary shards active yet.", false); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java index d2a933ffa46..0fa4411d839 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; @@ -27,6 +28,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -46,15 +48,17 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector { private final String jobId; private final QueryBuilder datafeedQuery; private final String[] datafeedIndices; + private final IndicesOptions indicesOptions; DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery, - String[] datafeedIndices, Client client) { + String[] datafeedIndices, IndicesOptions indicesOptions, Client client) { this.bucketSpan = bucketSpan; this.window = window; this.jobId = jobId; this.timeField = timeField; this.datafeedQuery = datafeedQuery; this.datafeedIndices = datafeedIndices; + this.indicesOptions = Objects.requireNonNull(indicesOptions); this.client = client; } @@ -115,7 +119,7 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector { .fixedInterval(new DateHistogramInterval(bucketSpan + "ms")).field(timeField)) .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end)); - SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder); + SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder).indicesOptions(indicesOptions); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java index 88f8e6caadf..23a5de6baa0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java @@ -51,6 +51,7 @@ public class DelayedDataDetectorFactory { job.getDataDescription().getTimeField(), datafeedConfig.getParsedQuery(xContentRegistry), datafeedConfig.getIndices().toArray(new String[0]), + datafeedConfig.getIndicesOptions(), client); } else { return new NullDelayedDataDetector(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index d43ede48d05..5863a4e05fd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -79,7 +79,7 @@ public interface DataExtractorFactory { client, ClientHelper.ML_ORIGIN, GetRollupIndexCapsAction.INSTANCE, - new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])), + new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]), datafeed.getIndicesOptions()), getRollupIndexCapsActionHandler); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 031db35ebac..aea9c9d7a75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -28,6 +28,7 @@ class AggregationDataExtractor extends AbstractAggregationDataExtractor headers; + final IndicesOptions indicesOptions; AggregationDataExtractorContext(String jobId, String timeField, Set fields, List indices, QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -39,5 +41,6 @@ class AggregationDataExtractorContext { this.end = end; this.includeDocCount = includeDocCount; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 40d186542df..93c5a12d82d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -50,7 +50,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions()); return new AggregationDataExtractor(client, dataExtractorContext, timingStatsReporter); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java index 93411610577..75b215e34c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java @@ -26,7 +26,9 @@ class RollupDataExtractor extends AbstractAggregationDataExtractor headers; final boolean hasAggregations; final Long histogramInterval; + final IndicesOptions indicesOptions; ChunkedDataExtractorContext(String jobId, String timeField, List indices, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, TimeAligner timeAligner, Map headers, - boolean hasAggregations, @Nullable Long histogramInterval) { + boolean hasAggregations, @Nullable Long histogramInterval, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -48,5 +50,6 @@ class ChunkedDataExtractorContext { this.headers = headers; this.hasAggregations = hasAggregations; this.histogramInterval = histogramInterval; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 028409f6939..e153bafa096 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -54,7 +54,8 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { timeAligner, datafeedConfig.getHeaders(), datafeedConfig.hasAggregations(), - datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null + datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null, + datafeedConfig.getIndicesOptions() ); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext, timingStatsReporter); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index e0bf14b1bb1..8745ee49a11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -125,6 +125,7 @@ class ScrollDataExtractor implements DataExtractor { .setScroll(SCROLL_TIMEOUT) .addSort(context.extractedFields.timeField(), SortOrder.ASC) .setIndices(context.indices) + .setIndicesOptions(context.indicesOptions) .setSize(context.scrollSize) .setQuery(ExtractorUtils.wrapInTimeRangeQuery( context.query, context.extractedFields.timeField(), start, context.end)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java index 19dbfa669b1..7ec6b3b65c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -23,10 +24,11 @@ class ScrollDataExtractorContext { final long start; final long end; final Map headers; + final IndicesOptions indicesOptions; ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List indices, QueryBuilder query, List scriptFields, int scrollSize, long start, long end, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -36,5 +38,6 @@ class ScrollDataExtractorContext { this.start = start; this.end = end; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index 0c626ea7e22..7bc995df5b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.Objects; public class ScrollDataExtractorFactory implements DataExtractorFactory { - private final Client client; private final DatafeedConfig datafeedConfig; private final Job job; @@ -54,7 +53,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getScrollSize(), start, end, - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions() + ); return new ScrollDataExtractor(client, dataExtractorContext, timingStatsReporter); } @@ -87,11 +88,13 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { // Step 1. Get field capabilities necessary to build the information of how to extract fields FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); - fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()])); + fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions()); // We need capabilities for all fields matching the requested fields' parents so that we can work around // multi-fields that are not in source. - String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*") - .toArray(size -> new String[size]); + String[] requestFields = job.allInputFields() + .stream() + .map(f -> MlStrings.getParentField(f) + "*") + .toArray(String[]::new); fieldCapabilitiesRequest.fields(requestFields); ClientHelper. executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java index 8b71e137bea..4b79abdfef6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,9 @@ public class RestPutDatafeedAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); XContentParser parser = restRequest.contentParser(); - PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, parser); + PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); putDatafeedRequest.timeout(restRequest.paramAsTime("timeout", putDatafeedRequest.timeout())); putDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putDatafeedRequest.masterNodeTimeout())); return channel -> client.execute(PutDatafeedAction.INSTANCE, putDatafeedRequest, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java index 0375be2907b..cf804692133 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,15 @@ public class RestUpdateDatafeedAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = null; + if (restRequest.hasParam("expand_wildcards") + || restRequest.hasParam("ignore_unavailable") + || restRequest.hasParam("allow_no_indices") + || restRequest.hasParam("ignore_throttled")) { + indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); + } XContentParser parser = restRequest.contentParser(); - UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser); + UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); updateDatafeedRequest.timeout(restRequest.paramAsTime("timeout", updateDatafeedRequest.timeout())); updateDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", updateDatafeedRequest.masterNodeTimeout())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 39f3a3f4889..3c986b764a0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -75,10 +76,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { @@ -91,10 +101,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { @@ -107,15 +126,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } @@ -131,15 +158,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); @@ -160,13 +195,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { @@ -185,13 +229,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { @@ -204,17 +257,32 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + - "does not exist, is closed, or is still initializing.")); + assertThat(result.getExplanation(), + equalTo("cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving " + + "indices given [not_foo] and indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, " + + "expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, " + + "allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] " + + "with exception [no such index [not_foo]]]")); } public void testRemoteIndex() { @@ -227,8 +295,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNotNull(result.getExecutorNode()); } @@ -245,15 +317,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); @@ -261,9 +341,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { @@ -280,10 +370,17 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]]")); } public void testSelectNode_GivenMlUpgradeMode() { @@ -297,8 +394,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } @@ -314,8 +415,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 537a6b44d0b..72c3edb33cd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -272,7 +273,7 @@ public class AggregationDataExtractorTests extends ESTestCase { private AggregationDataExtractorContext createContext(long start, long end) { return new AggregationDataExtractorContext(jobId, timeField, fields, indices, query, aggs, start, end, true, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index e7ac3625439..657545d5f3e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -565,7 +565,8 @@ public class ChunkedDataExtractorTests extends ESTestCase { private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) { return new ChunkedDataExtractorContext(jobId, timeField, indices, query, scrollSize, start, end, chunkSpan, - ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval); + ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval, + SearchRequest.DEFAULT_INDICES_OPTIONS); } private static class StubSubExtractor implements DataExtractor { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index 962bd9ee6d3..76e86a9a821 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -444,7 +445,7 @@ public class ScrollDataExtractorTests extends ESTestCase { List sFields = Arrays.asList(withoutSplit, withSplit); ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices, - query, sFields, scrollSize, 1000, 2000, Collections.emptyMap()); + query, sFields, scrollSize, 1000, 2000, Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); TestDataExtractor extractor = new TestDataExtractor(context); @@ -490,7 +491,7 @@ public class ScrollDataExtractorTests extends ESTestCase { private ScrollDataExtractorContext createContext(long start, long end) { return new ScrollDataExtractorContext(jobId, extractedFields, indices, query, scriptFields, scrollSize, start, end, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } private SearchResponse createEmptySearchResponse() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json index 3991c682ac2..30985f17800 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed config", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json index d4c4a1f63d7..f820933c769 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed update settings", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index da4dfe7ec23..bca3996f227 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -415,3 +415,78 @@ setup: datafeed_id: test-datafeed-1 force: true - match: { acknowledged: true } +--- +"Test put and update datafeed with indices options": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"], + "indices_options": { + "expand_wildcards": ["closed", "open"], + "ignore_throttled": false + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - length: { datafeeds.0.indices_options.expand_wildcards: 2 } + - match: { datafeeds.0.indices_options.expand_wildcards.0: open } + - match: { datafeeds.0.indices_options.expand_wildcards.1: closed } + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "indices_options": { + "ignore_throttled": true + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: true } +--- +"Test put and update datafeed with indices options in params": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"] + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + allow_no_indices: false + body: > + { + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - match: { datafeeds.0.indices_options.allow_no_indices: false } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml index c24318e0b19..08ea8e32df9 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - --- "Put job and datafeed without aggs in old cluster": diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml index 5d824714e00..15e907db715 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - - do: cluster.health: wait_for_status: green