From eac38e9847ae3cc89b89acdffade306fe78dda71 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 27 Feb 2020 13:43:25 -0500 Subject: [PATCH] [ML] Add indices_options to datafeed config and update (#52793) (#52905) This adds a new configurable field called `indices_options`. This allows users to create or update the indices_options used when a datafeed reads from an index. This is necessary for the following use cases: - Reading from frozen indices - Allowing certain indices in multiple index patterns to not exist yet These index options are available on datafeed creation and update. Users may specify them as URL parameters or within the configuration object. closes https://github.com/elastic/elasticsearch/issues/48056 --- .../client/ml/datafeed/DatafeedConfig.java | 32 ++- .../client/ml/datafeed/DatafeedUpdate.java | 31 ++- .../ml/datafeed/DatafeedConfigTests.java | 8 + .../ml/datafeed/DatafeedUpdateTests.java | 8 + .../apis/put-datafeed.asciidoc | 5 + .../apis/update-datafeed.asciidoc | 3 + docs/reference/ml/ml-shared.asciidoc | 13 ++ .../core/ml/action/PutDatafeedAction.java | 6 +- .../core/ml/action/StartDatafeedAction.java | 31 ++- .../core/ml/action/UpdateDatafeedAction.java | 7 +- .../core/ml/datafeed/DatafeedConfig.java | 47 ++++- .../core/ml/datafeed/DatafeedUpdate.java | 52 ++++- .../ml/job/results/ReservedFieldNames.java | 1 + .../xpack/core/ml/config_index_mappings.json | 4 + .../action/PutDatafeedActionRequestTests.java | 8 +- .../UpdateDatafeedActionRequestTests.java | 2 +- .../core/ml/datafeed/DatafeedConfigTests.java | 31 ++- .../core/ml/datafeed/DatafeedUpdateTests.java | 40 +++- .../ml/integration/DatafeedJobsRestIT.java | 76 +++++++ .../action/TransportStartDatafeedAction.java | 10 +- .../ml/datafeed/DatafeedNodeSelector.java | 26 ++- .../DatafeedDelayedDataDetector.java | 8 +- .../DelayedDataDetectorFactory.java | 1 + .../extractor/DataExtractorFactory.java | 2 +- .../aggregation/AggregationDataExtractor.java | 1 + .../AggregationDataExtractorContext.java | 5 +- .../AggregationDataExtractorFactory.java | 3 +- .../aggregation/RollupDataExtractor.java | 4 +- .../RollupDataExtractorFactory.java | 3 +- .../chunked/ChunkedDataExtractor.java | 5 +- .../chunked/ChunkedDataExtractorContext.java | 5 +- .../chunked/ChunkedDataExtractorFactory.java | 3 +- .../extractor/scroll/ScrollDataExtractor.java | 1 + .../scroll/ScrollDataExtractorContext.java | 5 +- .../scroll/ScrollDataExtractorFactory.java | 13 +- .../rest/datafeeds/RestPutDatafeedAction.java | 5 +- .../datafeeds/RestUpdateDatafeedAction.java | 11 +- .../datafeed/DatafeedNodeSelectorTests.java | 189 ++++++++++++++---- .../AggregationDataExtractorTests.java | 3 +- .../chunked/ChunkedDataExtractorTests.java | 3 +- .../scroll/ScrollDataExtractorTests.java | 5 +- .../rest-api-spec/api/ml.put_datafeed.json | 25 +++ .../rest-api-spec/api/ml.update_datafeed.json | 25 +++ .../rest-api-spec/test/ml/datafeeds_crud.yml | 75 +++++++ .../test/old_cluster/40_ml_datafeed_crud.yml | 1 - .../upgraded_cluster/40_ml_datafeed_crud.yml | 1 - 46 files changed, 732 insertions(+), 111 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java index f192b420eba..0b53c2e1852 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.client.ml.datafeed; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ml.job.config.Job; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; @@ -63,6 +64,7 @@ public class DatafeedConfig implements ToXContentObject { public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); @@ -90,6 +92,9 @@ public class DatafeedConfig implements ToXContentObject { PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -110,11 +115,12 @@ public class DatafeedConfig implements ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -127,6 +133,7 @@ public class DatafeedConfig implements ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public String getId() { @@ -177,6 +184,10 @@ public class DatafeedConfig implements ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -216,6 +227,11 @@ public class DatafeedConfig implements ToXContentObject { if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + if (indicesOptions != null) { + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; @@ -257,7 +273,8 @@ public class DatafeedConfig implements ToXContentObject { && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -268,7 +285,7 @@ public class DatafeedConfig implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id, String jobId) { @@ -289,6 +306,7 @@ public class DatafeedConfig implements ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id, String jobId) { this.id = Objects.requireNonNull(id, ID.getPreferredName()); @@ -308,6 +326,7 @@ public class DatafeedConfig implements ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public Builder setIndices(List indices) { @@ -395,9 +414,14 @@ public class DatafeedConfig implements ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedConfig build() { return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java index e11e2e1d9b3..3acdda37615 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -80,6 +81,9 @@ public class DatafeedUpdate implements ToXContentObject { DelayedDataCheckConfig.PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + DatafeedConfig.INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -100,11 +104,12 @@ public class DatafeedUpdate implements ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -117,6 +122,7 @@ public class DatafeedUpdate implements ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } /** @@ -157,6 +163,11 @@ public class DatafeedUpdate implements ToXContentObject { addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -211,6 +222,10 @@ public class DatafeedUpdate implements ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + private static Map asMap(BytesReference bytesReference) { return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2(); } @@ -247,7 +262,8 @@ public class DatafeedUpdate implements ToXContentObject { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -258,7 +274,7 @@ public class DatafeedUpdate implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id) { @@ -279,6 +295,7 @@ public class DatafeedUpdate implements ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id) { this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); @@ -297,6 +314,7 @@ public class DatafeedUpdate implements ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } @Deprecated @@ -381,9 +399,14 @@ public class DatafeedUpdate implements ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java index 7f7a03ab2e1..16bd25bb492 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -109,6 +110,13 @@ public class DatafeedConfigTests extends AbstractXContentTestCase { public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) { DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null); + if (datafeed.getIndicesOptions() == null) { + datafeed.setIndicesOptions(indicesOptions); + } datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 33102cf93a9..d9676a52674 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; @@ -150,6 +152,9 @@ public class StartDatafeedAction extends ActionType { params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString(DatafeedParams::setJobId, Job.ID); PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES); + PARSER.declareObject(DatafeedParams::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) { @@ -193,6 +198,11 @@ public class StartDatafeedAction extends ActionType { jobId = in.readOptionalString(); datafeedIndices = in.readStringList(); } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } DatafeedParams() { @@ -204,6 +214,7 @@ public class StartDatafeedAction extends ActionType { private TimeValue timeout = TimeValue.timeValueSeconds(20); private List datafeedIndices = Collections.emptyList(); private String jobId; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; public String getDatafeedId() { @@ -250,6 +261,15 @@ public class StartDatafeedAction extends ActionType { this.datafeedIndices = datafeedIndices; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public DatafeedParams setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, DatafeedConfig.INDICES_OPTIONS); + return this; + } + @Override public String getWriteableName() { return MlTasks.DATAFEED_TASK_NAME; @@ -270,6 +290,9 @@ public class StartDatafeedAction extends ActionType { out.writeOptionalString(jobId); out.writeStringCollection(datafeedIndices); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -287,13 +310,18 @@ public class StartDatafeedAction extends ActionType { if (datafeedIndices.isEmpty() == false) { builder.field(INDICES.getPreferredName(), datafeedIndices); } + + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices); + return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices, indicesOptions); } @Override @@ -310,6 +338,7 @@ public class StartDatafeedAction extends ActionType { Objects.equals(endTime, other.endTime) && Objects.equals(timeout, other.timeout) && Objects.equals(jobId, other.jobId) && + Objects.equals(indicesOptions, other.indicesOptions) && Objects.equals(datafeedIndices, other.datafeedIndices); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 5d82a216626..a5aebc61df2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -31,8 +33,11 @@ public class UpdateDatafeedAction extends ActionType public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) { DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); + if (indicesOptions != null) { + update.setIndicesOptions(indicesOptions); + } update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 9c0f46e0fea..b7621ba7581 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -92,6 +94,7 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -154,6 +157,9 @@ public class DatafeedConfig extends AbstractDiffable implements ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER, DELAYED_DATA_CHECK_CONFIG); parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + parser.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + INDICES_OPTIONS); return parser; } @@ -179,11 +185,12 @@ public class DatafeedConfig extends AbstractDiffable implements private final Map headers; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, - DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) { + DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -197,6 +204,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.headers = Collections.unmodifiableMap(headers); this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS); } public DatafeedConfig(StreamInput in) throws IOException { @@ -242,6 +250,11 @@ public class DatafeedConfig extends AbstractDiffable implements } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } /** @@ -414,6 +427,10 @@ public class DatafeedConfig extends AbstractDiffable implements return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -455,6 +472,9 @@ public class DatafeedConfig extends AbstractDiffable implements if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalVInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -494,6 +514,10 @@ public class DatafeedConfig extends AbstractDiffable implements if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @@ -527,13 +551,14 @@ public class DatafeedConfig extends AbstractDiffable implements && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.headers, that.headers) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - headers, delayedDataCheckConfig, maxEmptySearches); + headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -607,6 +632,7 @@ public class DatafeedConfig extends AbstractDiffable implements private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -630,6 +656,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.headers = new HashMap<>(config.headers); this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public void setId(String datafeedId) { @@ -735,6 +762,15 @@ public class DatafeedConfig extends AbstractDiffable implements } } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public IndicesOptions getIndicesOptions() { + return this.indicesOptions; + } + public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -749,8 +785,11 @@ public class DatafeedConfig extends AbstractDiffable implements setDefaultChunkingConfig(); setDefaultQueryDelay(); + if (indicesOptions == null) { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } void validateScriptFields() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index f9b590730cd..2fedc1f127c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.ml.datafeed; import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -82,6 +84,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { DelayedDataCheckConfig.STRICT_PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } private final String id; @@ -96,12 +101,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -114,6 +120,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public DatafeedUpdate(StreamInput in) throws IOException { @@ -156,6 +163,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null; + } else { + indicesOptions = null; + } } /** @@ -204,6 +216,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + if (indicesOptions != null) { + out.writeBoolean(true); + indicesOptions.writeIndicesOptions(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -235,7 +255,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); - + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -308,6 +332,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + /** * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update @@ -355,6 +383,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (maxEmptySearches != null) { builder.setMaxEmptySearches(maxEmptySearches); } + if (indicesOptions != null) { + builder.setIndicesOptions(indicesOptions); + } if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context @@ -395,13 +426,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - delayedDataCheckConfig, maxEmptySearches); + delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -420,7 +452,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())) && (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches()) - || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)); + || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)) + && (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions())); } public static class Builder { @@ -437,6 +470,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -458,6 +492,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } public Builder setId(String datafeedId) { @@ -535,9 +570,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 890ecbb87e5..c0b915f8632 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -287,6 +287,7 @@ public final class ReservedFieldNames { DatafeedConfig.CHUNKING_CONFIG.getPreferredName(), DatafeedConfig.HEADERS.getPreferredName(), DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), + DatafeedConfig.INDICES_OPTIONS.getPreferredName(), DelayedDataCheckConfig.ENABLED.getPreferredName(), DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index aa7211cf448..ec1daf22e35 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -290,6 +290,10 @@ "indices" : { "type" : "keyword" }, + "indices_options": { + "type" : "object", + "enabled" : false + }, "job_id" : { "type" : "keyword" }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java index dfbc7e7a55a..633e54a479f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -13,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction.Request; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests; import org.junit.Before; @@ -30,9 +30,7 @@ public class PutDatafeedActionRequestTests extends AbstractSerializingTestCase headers = new HashMap<>(); @@ -843,7 +856,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; + private final IndicesOptions indicesOptions; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, - String jobId, List datafeedIndices) { + String jobId, List datafeedIndices, IndicesOptions indicesOptions) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeedId = datafeedId; this.jobId = jobId; @@ -47,6 +49,7 @@ public class DatafeedNodeSelector { this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); + this.indicesOptions = Objects.requireNonNull(indicesOptions); } public void checkDatafeedTaskCanBeCreated() { @@ -117,25 +120,28 @@ public class DatafeedNodeSelector { } String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeedId + "] because index [" - + index + "] does not exist, is closed, or is still initializing."; try { - concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); + concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index); if (concreteIndices.length == 0) { - return new AssignmentFailure(reason, true); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + index + "] does not exist, is closed, or is still initializing.", true); } } catch (Exception e) { - LOGGER.debug(reason, e); - return new AssignmentFailure(reason, true); + String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]", + index, + indicesOptions).getFormattedMessage(); + LOGGER.debug("[" + datafeedId + "] " + msg, e); + return new AssignmentFailure( + "cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]", + true); } for (String concreteIndex : concreteIndices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeedId + "] because index [" - + concreteIndex + "] does not have all primary shards active yet."; - return new AssignmentFailure(reason, false); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + concreteIndex + "] does not have all primary shards active yet.", false); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java index d2a933ffa46..0fa4411d839 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; @@ -27,6 +28,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -46,15 +48,17 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector { private final String jobId; private final QueryBuilder datafeedQuery; private final String[] datafeedIndices; + private final IndicesOptions indicesOptions; DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery, - String[] datafeedIndices, Client client) { + String[] datafeedIndices, IndicesOptions indicesOptions, Client client) { this.bucketSpan = bucketSpan; this.window = window; this.jobId = jobId; this.timeField = timeField; this.datafeedQuery = datafeedQuery; this.datafeedIndices = datafeedIndices; + this.indicesOptions = Objects.requireNonNull(indicesOptions); this.client = client; } @@ -115,7 +119,7 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector { .fixedInterval(new DateHistogramInterval(bucketSpan + "ms")).field(timeField)) .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end)); - SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder); + SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder).indicesOptions(indicesOptions); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java index 88f8e6caadf..23a5de6baa0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java @@ -51,6 +51,7 @@ public class DelayedDataDetectorFactory { job.getDataDescription().getTimeField(), datafeedConfig.getParsedQuery(xContentRegistry), datafeedConfig.getIndices().toArray(new String[0]), + datafeedConfig.getIndicesOptions(), client); } else { return new NullDelayedDataDetector(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index d43ede48d05..5863a4e05fd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -79,7 +79,7 @@ public interface DataExtractorFactory { client, ClientHelper.ML_ORIGIN, GetRollupIndexCapsAction.INSTANCE, - new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])), + new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]), datafeed.getIndicesOptions()), getRollupIndexCapsActionHandler); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 031db35ebac..aea9c9d7a75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -28,6 +28,7 @@ class AggregationDataExtractor extends AbstractAggregationDataExtractor headers; + final IndicesOptions indicesOptions; AggregationDataExtractorContext(String jobId, String timeField, Set fields, List indices, QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -39,5 +41,6 @@ class AggregationDataExtractorContext { this.end = end; this.includeDocCount = includeDocCount; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 40d186542df..93c5a12d82d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -50,7 +50,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions()); return new AggregationDataExtractor(client, dataExtractorContext, timingStatsReporter); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java index 93411610577..75b215e34c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java @@ -26,7 +26,9 @@ class RollupDataExtractor extends AbstractAggregationDataExtractor headers; final boolean hasAggregations; final Long histogramInterval; + final IndicesOptions indicesOptions; ChunkedDataExtractorContext(String jobId, String timeField, List indices, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, TimeAligner timeAligner, Map headers, - boolean hasAggregations, @Nullable Long histogramInterval) { + boolean hasAggregations, @Nullable Long histogramInterval, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -48,5 +50,6 @@ class ChunkedDataExtractorContext { this.headers = headers; this.hasAggregations = hasAggregations; this.histogramInterval = histogramInterval; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 028409f6939..e153bafa096 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -54,7 +54,8 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory { timeAligner, datafeedConfig.getHeaders(), datafeedConfig.hasAggregations(), - datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null + datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null, + datafeedConfig.getIndicesOptions() ); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext, timingStatsReporter); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index e0bf14b1bb1..8745ee49a11 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -125,6 +125,7 @@ class ScrollDataExtractor implements DataExtractor { .setScroll(SCROLL_TIMEOUT) .addSort(context.extractedFields.timeField(), SortOrder.ASC) .setIndices(context.indices) + .setIndicesOptions(context.indicesOptions) .setSize(context.scrollSize) .setQuery(ExtractorUtils.wrapInTimeRangeQuery( context.query, context.extractedFields.timeField(), start, context.end)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java index 19dbfa669b1..7ec6b3b65c3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -23,10 +24,11 @@ class ScrollDataExtractorContext { final long start; final long end; final Map headers; + final IndicesOptions indicesOptions; ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List indices, QueryBuilder query, List scriptFields, int scrollSize, long start, long end, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -36,5 +38,6 @@ class ScrollDataExtractorContext { this.start = start; this.end = end; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index 0c626ea7e22..7bc995df5b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.Objects; public class ScrollDataExtractorFactory implements DataExtractorFactory { - private final Client client; private final DatafeedConfig datafeedConfig; private final Job job; @@ -54,7 +53,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { datafeedConfig.getScrollSize(), start, end, - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions() + ); return new ScrollDataExtractor(client, dataExtractorContext, timingStatsReporter); } @@ -87,11 +88,13 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory { // Step 1. Get field capabilities necessary to build the information of how to extract fields FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); - fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()])); + fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions()); // We need capabilities for all fields matching the requested fields' parents so that we can work around // multi-fields that are not in source. - String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*") - .toArray(size -> new String[size]); + String[] requestFields = job.allInputFields() + .stream() + .map(f -> MlStrings.getParentField(f) + "*") + .toArray(String[]::new); fieldCapabilitiesRequest.fields(requestFields); ClientHelper. executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java index 8b71e137bea..4b79abdfef6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,9 @@ public class RestPutDatafeedAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); XContentParser parser = restRequest.contentParser(); - PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, parser); + PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); putDatafeedRequest.timeout(restRequest.paramAsTime("timeout", putDatafeedRequest.timeout())); putDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putDatafeedRequest.masterNodeTimeout())); return channel -> client.execute(PutDatafeedAction.INSTANCE, putDatafeedRequest, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java index 0375be2907b..cf804692133 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,15 @@ public class RestUpdateDatafeedAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = null; + if (restRequest.hasParam("expand_wildcards") + || restRequest.hasParam("ignore_unavailable") + || restRequest.hasParam("allow_no_indices") + || restRequest.hasParam("ignore_throttled")) { + indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); + } XContentParser parser = restRequest.contentParser(); - UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser); + UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); updateDatafeedRequest.timeout(restRequest.paramAsTime("timeout", updateDatafeedRequest.timeout())); updateDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", updateDatafeedRequest.masterNodeTimeout())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 39f3a3f4889..3c986b764a0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -75,10 +76,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { @@ -91,10 +101,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { @@ -107,15 +126,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } @@ -131,15 +158,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); @@ -160,13 +195,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { @@ -185,13 +229,22 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { @@ -204,17 +257,32 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + - "does not exist, is closed, or is still initializing.")); + assertThat(result.getExplanation(), + equalTo("cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving " + + "indices given [not_foo] and indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, " + + "expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, " + + "allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] " + + "with exception [no such index [not_foo]]]")); } public void testRemoteIndex() { @@ -227,8 +295,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNotNull(result.getExecutorNode()); } @@ -245,15 +317,23 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); @@ -261,9 +341,19 @@ public class DatafeedNodeSelectorTests extends ESTestCase { addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { @@ -280,10 +370,17 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]]")); } public void testSelectNode_GivenMlUpgradeMode() { @@ -297,8 +394,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } @@ -314,8 +415,12 @@ public class DatafeedNodeSelectorTests extends ESTestCase { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 537a6b44d0b..72c3edb33cd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -272,7 +273,7 @@ public class AggregationDataExtractorTests extends ESTestCase { private AggregationDataExtractorContext createContext(long start, long end) { return new AggregationDataExtractorContext(jobId, timeField, fields, indices, query, aggs, start, end, true, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index e7ac3625439..657545d5f3e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -565,7 +565,8 @@ public class ChunkedDataExtractorTests extends ESTestCase { private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) { return new ChunkedDataExtractorContext(jobId, timeField, indices, query, scrollSize, start, end, chunkSpan, - ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval); + ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval, + SearchRequest.DEFAULT_INDICES_OPTIONS); } private static class StubSubExtractor implements DataExtractor { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index 962bd9ee6d3..76e86a9a821 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -444,7 +445,7 @@ public class ScrollDataExtractorTests extends ESTestCase { List sFields = Arrays.asList(withoutSplit, withSplit); ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices, - query, sFields, scrollSize, 1000, 2000, Collections.emptyMap()); + query, sFields, scrollSize, 1000, 2000, Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); TestDataExtractor extractor = new TestDataExtractor(context); @@ -490,7 +491,7 @@ public class ScrollDataExtractorTests extends ESTestCase { private ScrollDataExtractorContext createContext(long start, long end) { return new ScrollDataExtractorContext(jobId, extractedFields, indices, query, scriptFields, scrollSize, start, end, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } private SearchResponse createEmptySearchResponse() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json index 3991c682ac2..30985f17800 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed config", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json index d4c4a1f63d7..f820933c769 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed update settings", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index da4dfe7ec23..bca3996f227 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -415,3 +415,78 @@ setup: datafeed_id: test-datafeed-1 force: true - match: { acknowledged: true } +--- +"Test put and update datafeed with indices options": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"], + "indices_options": { + "expand_wildcards": ["closed", "open"], + "ignore_throttled": false + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - length: { datafeeds.0.indices_options.expand_wildcards: 2 } + - match: { datafeeds.0.indices_options.expand_wildcards.0: open } + - match: { datafeeds.0.indices_options.expand_wildcards.1: closed } + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "indices_options": { + "ignore_throttled": true + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: true } +--- +"Test put and update datafeed with indices options in params": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"] + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + allow_no_indices: false + body: > + { + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - match: { datafeeds.0.indices_options.allow_no_indices: false } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml index c24318e0b19..08ea8e32df9 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - --- "Put job and datafeed without aggs in old cluster": diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml index 5d824714e00..15e907db715 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - - do: cluster.health: wait_for_status: green