[7.x][ML] Add optional source filtering during data frame reindexing (#49690) (#49718)

This adds a `_source` setting under the `source` setting of a data
frame analytics config. The new `_source` is reusing the structure
of a `FetchSourceContext` like `analyzed_fields` does. Specifying
includes and excludes for source allows selecting which fields
will get reindexed and will be available in the destination index.

Closes #49531

Backport of #49690
This commit is contained in:
Dimitris Athanasiou 2019-11-29 16:10:44 +02:00 committed by GitHub
parent 813b49adb4
commit 4edb2e7bb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 525 additions and 123 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.Arrays;
@ -44,20 +45,27 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private static final ParseField INDEX = new ParseField("index");
private static final ParseField QUERY = new ParseField("query");
public static final ParseField _SOURCE = new ParseField("_source");
private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new);
static {
PARSER.declareStringArray(Builder::setIndex, INDEX);
PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY);
PARSER.declareField(Builder::setSourceFiltering,
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
}
private final String[] index;
private final QueryConfig queryConfig;
private final FetchSourceContext sourceFiltering;
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
this.sourceFiltering = sourceFiltering;
}
public String[] getIndex() {
@ -68,6 +76,10 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return queryConfig;
}
public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -75,6 +87,9 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
if (queryConfig != null) {
builder.field(QUERY.getPreferredName(), queryConfig.getQuery());
}
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
builder.endObject();
return builder;
}
@ -86,12 +101,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig);
&& Objects.equals(queryConfig, other.queryConfig)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryConfig);
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering);
}
@Override
@ -103,6 +119,7 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private String[] index;
private QueryConfig queryConfig;
private FetchSourceContext sourceFiltering;
private Builder() {}
@ -121,8 +138,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return this;
}
public Builder setSourceFiltering(FetchSourceContext sourceFiltering) {
this.sourceFiltering = sourceFiltering;
return this;
}
public DataFrameAnalyticsSource build() {
return new DataFrameAnalyticsSource(index, queryConfig);
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering);
}
}
}

View File

@ -2939,6 +2939,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1>
.setIndex("put-test-source-index") // <2>
.setQueryConfig(queryConfig) // <3>
.setSourceFiltering(new FetchSourceContext(true,
new String[] { "included_field_1", "included_field_2" },
new String[] { "excluded_field" })) // <4>
.build();
// end::put-data-frame-analytics-source-config

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
@ -35,9 +36,17 @@ import static org.elasticsearch.client.ml.dataframe.QueryConfigTests.randomQuery
public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase<DataFrameAnalyticsSource> {
public static DataFrameAnalyticsSource randomSourceConfig() {
FetchSourceContext sourceFiltering = null;
if (randomBoolean()) {
sourceFiltering = new FetchSourceContext(true,
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
return DataFrameAnalyticsSource.builder()
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.setSourceFiltering(sourceFiltering)
.build();
}

View File

@ -52,6 +52,7 @@ include-tagged::{doc-tests-file}[{api}-source-config]
<1> Constructing a new DataFrameAnalyticsSource
<2> The source index
<3> The query from which to gather the data. If query is not set, a `match_all` query is used by default.
<4> Source filtering to select which fields will exist in the destination index.
===== QueryConfig

View File

@ -16,17 +16,18 @@
<<dfanalytics-types>>.
`analyzed_fields`::
(object) You can specify both `includes` and/or `excludes` patterns. If
`analyzed_fields` is not set, only the relevant fields will be included. For
example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand field selection.
`includes`:::
(array) An array of strings that defines the fields that will be included in
(Optional, array) An array of strings that defines the fields that will be included in
the analysis.
`excludes`:::
(array) An array of strings that defines the fields that will be excluded
(Optional, array) An array of strings that defines the fields that will be excluded
from the analysis.
@ -81,8 +82,8 @@ PUT _ml/data_frame/analytics/loganalytics
that setting. For more information, see <<ml-settings>>.
`source`::
(object) The source configuration consisting an `index` and optionally a
`query` object.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
`index`:::
(Required, string or array) Index or indices on which to perform the
@ -96,6 +97,19 @@ PUT _ml/data_frame/analytics/loganalytics
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.
`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.
`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.
[[dfanalytics-types]]
==== Analysis objects
@ -277,4 +291,4 @@ improvement. If you override any parameters, then the optimization will
calculate the value of the remaining parameters accordingly and use the value
you provided for the overridden parameter. The number of rounds are reduced
respectively. The validation error is estimated in each round by using 4-fold
cross validation.
cross validation.

View File

@ -101,13 +101,13 @@ single number. For example, in case of age ranges, you can model the values as
<<dfanalytics-types>>.
`analyzed_fields`::
(Optional, object) You can specify both `includes` and/or `excludes` patterns.
If `analyzed_fields` is not set, only the relevant fields will be included.
For example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>. If you specify fields
either in `includes` or in `excludes` that have a data type that is not
supported, an error occurs.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand
field selection.
`includes`:::
(Optional, array) An array of strings that defines the fields that will be
included in the analysis.
@ -142,20 +142,33 @@ single number. For example, in case of age ranges, you can model the values as
that setting. For more information, see <<ml-settings>>.
`source`::
(Required, object) The source configuration, consisting of `index` and
optionally a `query`.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
`index`:::
(Required, string or array) Index or indices on which to perform the
analysis. It can be a single index or index pattern as well as an array of
indices or patterns.
`query`:::
(Optional, object) The {es} query domain-specific language
(<<query-dsl,DSL>>). This value corresponds to the query object in an {es}
search POST body. All the options that are supported by {es} can be used,
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`index`:::
(Required, string or array) Index or indices on which to perform the
analysis. It can be a single index or index pattern as well as an array of
indices or patterns.
`query`:::
(Optional, object) The {es} query domain-specific language
(<<query-dsl,DSL>>). This value corresponds to the query object in an {es}
search POST body. All the options that are supported by {es} can be used,
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.
`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.
`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.
`allow_lazy_start`::
(Optional, boolean) Whether this job should be allowed to start when there

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
@ -18,6 +19,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import java.io.IOException;
@ -87,6 +89,24 @@ public class PutDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalytic
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException error = null;
error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error);
return error;
}
private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(
DataFrameAnalyticsConfig config, ActionRequestValidationException error) {
if (config.getAnalyzedFields() == null) {
return null;
}
for (String analyzedInclude : config.getAnalyzedFields().includes()) {
if (config.getSource().isFieldExcluded(analyzedInclude)) {
return ValidateActions.addValidationError("field [" + analyzedInclude + "] is included in ["
+ DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName() + "] but not in ["
+ DataFrameAnalyticsConfig.SOURCE.getPreferredName() + "."
+ DataFrameAnalyticsSource._SOURCE.getPreferredName() + "]", error);
}
}
return null;
}

View File

@ -127,7 +127,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private final Version version;
private final boolean allowLazyStart;
public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) {
this.id = ExceptionsHelper.requireNonNull(id, ID);

View File

@ -6,17 +6,21 @@
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
@ -33,20 +37,29 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
public static final ParseField INDEX = new ParseField("index");
public static final ParseField QUERY = new ParseField("query");
public static final ParseField _SOURCE = new ParseField("_source");
public static ConstructingObjectParser<DataFrameAnalyticsSource, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<DataFrameAnalyticsSource, Void> parser = new ConstructingObjectParser<>("data_frame_analytics_source",
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(((List<String>) a[0]).toArray(new String[0]), (QueryProvider) a[1]));
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(
((List<String>) a[0]).toArray(new String[0]),
(QueryProvider) a[1],
(FetchSourceContext) a[2]));
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY);
parser.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
return parser;
}
private final String[] index;
private final QueryProvider queryProvider;
private final FetchSourceContext sourceFiltering;
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider) {
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX);
if (index.length == 0) {
throw new IllegalArgumentException("source.index must specify at least one index");
@ -55,22 +68,36 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
throw new IllegalArgumentException("source.index must contain non-null and non-empty strings");
}
this.queryProvider = queryProvider == null ? QueryProvider.defaultQuery() : queryProvider;
if (sourceFiltering != null && sourceFiltering.fetchSource() == false) {
throw new IllegalArgumentException("source._source cannot be disabled");
}
this.sourceFiltering = sourceFiltering;
}
public DataFrameAnalyticsSource(StreamInput in) throws IOException {
index = in.readStringArray();
queryProvider = QueryProvider.fromStream(in);
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
sourceFiltering = in.readOptionalWriteable(FetchSourceContext::new);
} else {
sourceFiltering = null;
}
}
public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) {
this.index = Arrays.copyOf(other.index, other.index.length);
this.queryProvider = new QueryProvider(other.queryProvider);
this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext(
other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(index);
queryProvider.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalWriteable(sourceFiltering);
}
}
@Override
@ -78,6 +105,9 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
builder.startObject();
builder.array(INDEX.getPreferredName(), index);
builder.field(QUERY.getPreferredName(), queryProvider.getQuery());
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
builder.endObject();
return builder;
}
@ -89,12 +119,13 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryProvider, other.queryProvider);
&& Objects.equals(queryProvider, other.queryProvider)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryProvider);
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering);
}
public String[] getIndex() {
@ -118,6 +149,10 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
return queryProvider.getParsedQuery();
}
public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}
Exception getQueryParsingException() {
return queryProvider.getParsingException();
}
@ -147,4 +182,47 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
Map<String, Object> getQuery() {
return queryProvider.getQuery();
}
public boolean isFieldExcluded(String path) {
if (sourceFiltering == null) {
return false;
}
// First we check in the excludes as they are applied last
for (String exclude : sourceFiltering.excludes()) {
if (pathMatchesSourcePattern(path, exclude)) {
return true;
}
}
// Now we can check the includes
// Empty includes means no further exclusions
if (sourceFiltering.includes().length == 0) {
return false;
}
for (String include : sourceFiltering.includes()) {
if (pathMatchesSourcePattern(path, include)) {
return false;
}
}
return true;
}
private static boolean pathMatchesSourcePattern(String path, String sourcePattern) {
if (sourcePattern.equals(path)) {
return true;
}
if (Regex.isSimpleMatchPattern(sourcePattern)) {
return Regex.simpleMatch(sourcePattern, path);
}
// At this stage sourcePattern is a concrete field name and path is not equal to it.
// We should check if path is a nested field of pattern.
// Let us take "foo" as an example.
// Fields that are "foo.*" should also be matched.
return Regex.simpleMatch(sourcePattern + ".*", path);
}
}

View File

@ -418,6 +418,9 @@ public class ElasticsearchMappings {
.startObject(DataFrameAnalyticsSource.QUERY.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(DataFrameAnalyticsSource._SOURCE.getPreferredName())
.field(ENABLED, false)
.endObject()
.endObject()
.endObject()
.startObject(DataFrameAnalyticsConfig.DEST.getPreferredName())

View File

@ -304,6 +304,7 @@ public final class ReservedFieldNames {
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(),
DataFrameAnalyticsSource.INDEX.getPreferredName(),
DataFrameAnalyticsSource.QUERY.getPreferredName(),
DataFrameAnalyticsSource._SOURCE.getPreferredName(),
OutlierDetection.NAME.getPreferredName(),
OutlierDetection.N_NEIGHBORS.getPreferredName(),
OutlierDetection.METHOD.getPreferredName(),

View File

@ -11,16 +11,25 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction.Request;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializingTestCase<Request> {
private String id;
@ -65,4 +74,39 @@ public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializing
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(id, parser);
}
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, new String[] {"excluded"}));
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(source)
.setAnalysis(OutlierDetectionTests.createRandom())
.setAnalyzedFields(analyzedFields)
.buildForExplain();
Request request = new Request(config);
Exception e = request.validate();
assertThat(e, is(notNullValue()));
assertThat(e.getMessage(), containsString("field [excluded] is included in [analyzed_fields] but not in [source._source]"));
}
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, new String[] {"included"}, null));
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(source)
.setAnalysis(OutlierDetectionTests.createRandom())
.setAnalyzedFields(analyzedFields)
.buildForExplain();
Request request = new Request(config);
Exception e = request.validate();
assertThat(e, is(nullValue()));
}
}

View File

@ -12,12 +12,18 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<DataFrameAnalyticsSource> {
@ -46,6 +52,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<D
public static DataFrameAnalyticsSource createRandom() {
String[] index = generateRandomStringArray(10, 10, false, false);
QueryProvider queryProvider = null;
FetchSourceContext sourceFiltering = null;
if (randomBoolean()) {
try {
queryProvider = QueryProvider.fromParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
@ -54,11 +61,75 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<D
throw new UncheckedIOException(e);
}
}
return new DataFrameAnalyticsSource(index, queryProvider);
if (randomBoolean()) {
sourceFiltering = new FetchSourceContext(true,
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering);
}
@Override
protected Writeable.Reader<DataFrameAnalyticsSource> instanceReader() {
return DataFrameAnalyticsSource::new;
}
public void testConstructor_GivenDisabledSource() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource(
new String[] {"index"}, null, new FetchSourceContext(false, null, null)));
assertThat(e.getMessage(), equalTo("source._source cannot be disabled"));
}
public void testIsFieldExcluded_GivenNoSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null);
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
public void testIsFieldExcluded_GivenSourceFilteringWithNulls() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null,
new FetchSourceContext(true, null, null));
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
public void testIsFieldExcluded_GivenExcludes() {
assertThat(newSourceWithExcludes("foo").isFieldExcluded("bar"), is(false));
assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo"), is(true));
assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo.bar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foobar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo.bar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo*"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("fo*"), is(false));
}
public void testIsFieldExcluded_GivenIncludes() {
assertThat(newSourceWithIncludes("foo").isFieldExcluded("bar"), is(true));
assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo"), is(false));
assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo.bar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foobar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo.bar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo*"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("fo*"), is(true));
}
public void testIsFieldExcluded_GivenIncludesAndExcludes() {
// Excludes take precedence
assertThat(newSourceWithIncludesExcludes(Collections.singletonList("foo"), Collections.singletonList("foo"))
.isFieldExcluded("foo"), is(true));
}
private static DataFrameAnalyticsSource newSourceWithIncludes(String... includes) {
return newSourceWithIncludesExcludes(Arrays.asList(includes), Collections.emptyList());
}
private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) {
return newSourceWithIncludesExcludes(Collections.emptyList(), Arrays.asList(excludes));
}
private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List<String> includes, List<String> excludes) {
FetchSourceContext sourceFiltering = new FetchSourceContext(true,
includes.toArray(new String[0]), excludes.toArray(new String[0]));
return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering);
}
}

View File

@ -52,6 +52,7 @@ integTest.runner {
'ml/data_frame_analytics_crud/Test put config with dest index included in source via alias',
'ml/data_frame_analytics_crud/Test put config with unknown top level field',
'ml/data_frame_analytics_crud/Test put config with unknown field in outlier detection analysis',
'ml/data_frame_analytics_crud/Test put config given analyzed_fields include field excluded by source',
'ml/data_frame_analytics_crud/Test put config given missing source',
'ml/data_frame_analytics_crud/Test put config given source with empty index array',
'ml/data_frame_analytics_crud/Test put config given source with empty string in index array',

View File

@ -53,7 +53,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one"))))
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")),
null))
.setAnalysis(new Classification("categorical"))
.buildForExplain();

View File

@ -163,7 +163,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
@Nullable String resultsField, DataFrameAnalysis analysis) {
return new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, resultsField))
.setAnalysis(analysis)
.build();

View File

@ -356,7 +356,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_multiple_source_indices";
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(sourceIndex, null))
.setSource(new DataFrameAnalyticsSource(sourceIndex, null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
@ -472,7 +472,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.MB);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)
@ -516,7 +516,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.TB);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)

View File

@ -236,7 +236,7 @@ public class TransportStartDataFrameAnalyticsAction
// Step 5. Validate mappings can be merged
ActionListener<StartContext> toValidateMappingsListener = ActionListener.wrap(
startContext -> MappingsMerger.mergeMappings(client, startContext.config.getHeaders(),
startContext.config.getSource().getIndex(), ActionListener.wrap(
startContext.config.getSource(), ActionListener.wrap(
mappings -> validateMappingsMergeListener.onResponse(startContext), finalListener::onFailure)),
finalListener::onFailure
);

View File

@ -85,8 +85,6 @@ public final class DataFrameAnalyticsIndex {
ActionListener<CreateIndexRequest> listener) {
AtomicReference<Settings> settingsHolder = new AtomicReference<>();
String[] sourceIndex = config.getSource().getIndex();
ActionListener<ImmutableOpenMap<String, MappingMetaData>> mappingsListener = ActionListener.wrap(
mappings -> listener.onResponse(createIndexRequest(clock, config, settingsHolder.get(), mappings)),
listener::onFailure
@ -95,7 +93,7 @@ public final class DataFrameAnalyticsIndex {
ActionListener<Settings> settingsListener = ActionListener.wrap(
settings -> {
settingsHolder.set(settings);
MappingsMerger.mergeMappings(client, config.getHeaders(), sourceIndex, mappingsListener);
MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource(), mappingsListener);
},
listener::onFailure
);
@ -106,7 +104,7 @@ public final class DataFrameAnalyticsIndex {
);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
getSettingsRequest.indices(sourceIndex);
getSettingsRequest.indices(config.getSource().getIndex());
getSettingsRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
getSettingsRequest.names(PRESERVED_SETTINGS);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE,

View File

@ -179,6 +179,7 @@ public class DataFrameAnalyticsManager {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(config.getSource().getIndex());
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
reindexRequest.setDestIndex(config.getDest().getIndex());
reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id"));

View File

@ -14,6 +14,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -32,19 +33,20 @@ public final class MappingsMerger {
private MappingsMerger() {}
public static void mergeMappings(Client client, Map<String, String> headers, String[] index,
public static void mergeMappings(Client client, Map<String, String> headers, DataFrameAnalyticsSource source,
ActionListener<ImmutableOpenMap<String, MappingMetaData>> listener) {
ActionListener<GetMappingsResponse> mappingsListener = ActionListener.wrap(
getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(getMappingsResponse)),
getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(source, getMappingsResponse)),
listener::onFailure
);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
getMappingsRequest.indices(index);
getMappingsRequest.indices(source.getIndex());
ClientHelper.executeWithHeadersAsync(headers, ML_ORIGIN, client, GetMappingsAction.INSTANCE, getMappingsRequest, mappingsListener);
}
static ImmutableOpenMap<String, MappingMetaData> mergeMappings(GetMappingsResponse getMappingsResponse) {
static ImmutableOpenMap<String, MappingMetaData> mergeMappings(DataFrameAnalyticsSource source,
GetMappingsResponse getMappingsResponse) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> indexToMappings = getMappingsResponse.getMappings();
String type = null;
@ -71,13 +73,16 @@ public final class MappingsMerger {
Map<String, Object> fieldMappings = (Map<String, Object>) currentMappings.get("properties");
for (Map.Entry<String, Object> fieldMapping : fieldMappings.entrySet()) {
if (mergedMappings.containsKey(fieldMapping.getKey())) {
if (mergedMappings.get(fieldMapping.getKey()).equals(fieldMapping.getValue()) == false) {
throw ExceptionsHelper.badRequestException("cannot merge mappings because of differences for field [{}]",
fieldMapping.getKey());
String field = fieldMapping.getKey();
if (source.isFieldExcluded(field) == false) {
if (mergedMappings.containsKey(field)) {
if (mergedMappings.get(field).equals(fieldMapping.getValue()) == false) {
throw ExceptionsHelper.badRequestException(
"cannot merge mappings because of differences for field [{}]", field);
}
} else {
mergedMappings.put(field, fieldMapping.getValue());
}
} else {
mergedMappings.put(fieldMapping.getKey(), fieldMapping.getValue());
}
}
}

View File

@ -85,6 +85,7 @@ public class ExtractedFieldsDetector {
fields.removeAll(IGNORE_FIELDS);
checkResultsFieldIsNotPresent();
removeFieldsUnderResultsField(fields);
applySourceFiltering(fields);
FetchSourceContext analyzedFields = config.getAnalyzedFields();
// If the user has not explicitly included fields we'll include all compatible fields
@ -132,6 +133,16 @@ public class ExtractedFieldsDetector {
}
}
private void applySourceFiltering(Set<String> fields) {
Iterator<String> fieldsIterator = fields.iterator();
while (fieldsIterator.hasNext()) {
String field = fieldsIterator.next();
if (config.getSource().isFieldExcluded(field)) {
fieldsIterator.remove();
}
}
}
private void addExcludedField(String field, String reason, Set<FieldSelection> fieldSelection) {
fieldSelection.add(FieldSelection.excluded(field, getMappingTypes(field), reason));
}

View File

@ -60,7 +60,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG =
new DataFrameAnalyticsConfig.Builder()
.setId(ANALYTICS_ID)
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();

View File

@ -10,7 +10,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import java.io.IOException;
import java.util.Collections;
@ -48,7 +50,7 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
@ -76,7 +78,7 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> MappingsMerger.mergeMappings(getMappingsResponse));
() -> MappingsMerger.mergeMappings(newSource(), getMappingsResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), containsString("source indices contain mappings for different types:"));
assertThat(e.getMessage(), containsString("type_1"));
@ -104,7 +106,7 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> MappingsMerger.mergeMappings(getMappingsResponse));
() -> MappingsMerger.mergeMappings(newSource(), getMappingsResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("cannot merge mappings because of differences for field [field_1]"));
}
@ -133,7 +135,7 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
@ -150,4 +152,41 @@ public class MappingsMergerTests extends ESTestCase {
assertThat(fieldMappings.get("field_2"), equalTo("field_2_mappings"));
assertThat(fieldMappings.get("field_3"), equalTo("field_3_mappings"));
}
public void testMergeMappings_GivenSourceFiltering() throws IOException {
Map<String, Object> indexProperties = new HashMap<>();
indexProperties.put("field_1", "field_1_mappings");
indexProperties.put("field_2", "field_2_mappings");
Map<String, Object> index1Mappings = Collections.singletonMap("properties", indexProperties);
MappingMetaData indexMappingMetaData = new MappingMetaData("_doc", index1Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> indexMappingsMap = ImmutableOpenMap.builder();
indexMappingsMap.put("_doc", indexMappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", indexMappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(
newSourceWithExcludes("field_1"), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
Map<String, Object> mappingsAsMap = mergedMappings.valuesIt().next().getSourceAsMap();
@SuppressWarnings("unchecked")
Map<String, Object> fieldMappings = (Map<String, Object>) mappingsAsMap.get("properties");
assertThat(fieldMappings.size(), equalTo(1));
assertThat(fieldMappings.containsKey("field_2"), is(true));
}
private static DataFrameAnalyticsSource newSource() {
return new DataFrameAnalyticsSource(new String[] {"index"}, null, null);
}
private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) {
return new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, excludes));
}
}

View File

@ -183,6 +183,6 @@ public class SourceDestValidatorTests extends ESTestCase {
}
private static DataFrameAnalyticsSource createSource(String... index) {
return new DataFrameAnalyticsSource(index, null);
return new DataFrameAnalyticsSource(index, null, null);
}
}

View File

@ -45,6 +45,9 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
private static final String DEST_INDEX = "dest_index";
private static final String RESULTS_FIELD = "ml";
private FetchSourceContext sourceFiltering;
private FetchSourceContext analyzedFields;
public void testDetect_GivenFloatField() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("some_float", "float").build();
@ -86,8 +89,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("some_keyword", "keyword").build();
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." +
" Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -99,7 +102,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -171,7 +174,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -183,11 +186,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("some_keyword", "keyword")
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -199,11 +202,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("some_keyword", "keyword")
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -213,10 +216,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("foo", "float")
.addAggregatableField("bar", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"});
analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
@ -239,7 +242,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " +
"expected types are [byte, double, float, half_float, integer, long, scaled_float, short]"));
@ -255,7 +258,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " +
"expected types are [boolean, byte, integer, ip, keyword, long, short, text]"));
@ -270,7 +273,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX,
buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]"));
}
@ -281,7 +284,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -291,11 +294,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("_id", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [_id] could be detected"));
}
@ -304,11 +307,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"});
analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [bar] could be detected"));
}
@ -318,10 +321,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("numeric", "float")
.addAggregatableField("categorical", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"});
analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
@ -366,11 +369,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("my_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected"));
}
@ -381,11 +384,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("my_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
analyzedFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
}
@ -397,10 +400,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<String> extractedFieldNames = fieldExtraction.v1().getAllFields().stream().map(ExtractedField::getName)
@ -422,11 +425,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -442,7 +445,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
"please set a different results_field"));
@ -479,11 +482,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
"please set a different results_field"));
@ -496,11 +499,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [ml] could be detected"));
}
@ -814,10 +817,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("field_1.keyword", "keyword")
.addAggregatableField("field_2", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildRegressionConfig("field_2"), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
assertThat(fieldExtraction.v1().getAllFields().size(), equalTo(2));
@ -832,38 +835,76 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
);
}
private static DataFrameAnalyticsConfig buildOutlierDetectionConfig() {
return buildOutlierDetectionConfig(null);
public void testDetect_GivenSourceFilteringWithIncludes() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("field_11", "float")
.addAggregatableField("field_12", "float")
.addAggregatableField("field_21", "float")
.addAggregatableField("field_22", "float").build();
sourceFiltering = new FetchSourceContext(true, new String[] {"field_1*"}, null);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
assertThat(allFields.size(), equalTo(2));
assertThat(allFields.get(0).getName(), equalTo("field_11"));
assertThat(allFields.get(1).getName(), equalTo("field_12"));
assertFieldSelectionContains(fieldExtraction.v2(),
FieldSelection.included("field_11", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL),
FieldSelection.included("field_12", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL));
}
private static DataFrameAnalyticsConfig buildOutlierDetectionConfig(FetchSourceContext analyzedFields) {
public void testDetect_GivenSourceFilteringWithExcludes() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("field_11", "float")
.addAggregatableField("field_12", "float")
.addAggregatableField("field_21", "float")
.addAggregatableField("field_22", "float").build();
sourceFiltering = new FetchSourceContext(true, null, new String[] {"field_1*"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
assertThat(allFields.size(), equalTo(2));
assertThat(allFields.get(0).getName(), equalTo("field_21"));
assertThat(allFields.get(1).getName(), equalTo("field_22"));
assertFieldSelectionContains(fieldExtraction.v2(),
FieldSelection.included("field_21", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL),
FieldSelection.included("field_22", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL));
}
private DataFrameAnalyticsConfig buildOutlierDetectionConfig() {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new OutlierDetection.Builder().build())
.build();
}
private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) {
return buildRegressionConfig(dependentVariable, null);
}
private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, FetchSourceContext analyzedFields) {
private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new Regression(dependentVariable))
.build();
}
private static DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) {
private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalysis(new Classification(dependentVariable))
.build();

View File

@ -71,7 +71,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(JOB_ID)
.setDescription(JOB_DESCRIPTION)
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null))
.setDest(new DataFrameAnalyticsDest("my_dest", null))
.setAnalysis(new Regression("foo"))
.build();

View File

@ -41,7 +41,8 @@ setup:
{
"source": {
"index": "index-source",
"query": {"term" : { "user" : "Kimchy" }}
"query": {"term" : { "user" : "Kimchy" }},
"_source": [ "obj1.*", "obj2.*" ]
},
"dest": {
"index": "index-dest"
@ -1852,3 +1853,28 @@ setup:
}}
- is_true: create_time
- is_true: version
---
"Test put config given analyzed_fields include field excluded by source":
- do:
catch: /field \[excluded\] is included in \[analyzed_fields\] but not in \[source._source\]/
ml.put_data_frame_analytics:
id: "analyzed_fields-include-field-excluded-by-source"
body: >
{
"source": {
"index": "index-source",
"query": {"term" : { "user" : "Kimchy" }},
"_source": {
"excludes": ["excluded"]
}
},
"dest": {
"index": "index-dest"
},
"analysis": {"outlier_detection":{}},
"analyzed_fields": {
"includes": ["excluded"]
}
}