From 1d64d55a863a19de2a349bde2dfc47ece980137f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Sat, 6 Jun 2020 08:15:17 +0100 Subject: [PATCH] [7.x][ML] Add per-partition categorization option (#57723) This PR adds the initial Java side changes to enable use of the per-partition categorization functionality added in elastic/ml-cpp#1293. There will be a followup change to complete the work, as there cannot be any end-to-end integration tests until elastic/ml-cpp#1293 is merged, and also elastic/ml-cpp#1293 does not implement some of the more peripheral functionality, like stop_on_warn and per-partition stats documents. The changes so far cover REST APIs, results object formats, HLRC and docs. Backport of #57683 --- .../client/ml/GetCategoriesRequest.java | 27 ++++- .../client/ml/job/config/AnalysisConfig.java | 31 +++++- .../client/ml/job/config/JobUpdate.java | 31 +++++- .../PerPartitionCategorizationConfig.java | 95 ++++++++++++++++ .../ml/job/results/CategoryDefinition.java | 33 +++++- .../client/ml/GetCategoriesRequestTests.java | 3 + .../ml/job/config/AnalysisConfigTests.java | 10 +- ...PerPartitionCategorizationConfigTests.java | 42 +++++++ .../job/results/CategoryDefinitionTests.java | 6 + .../apis/get-category.asciidoc | 26 ++++- .../anomaly-detection/apis/put-job.asciidoc | 18 +++ .../apis/update-job.asciidoc | 18 +++ docs/reference/ml/ml-shared.asciidoc | 19 ++++ .../core/ml/action/GetCategoriesAction.java | 28 ++++- .../core/ml/action/UpdateProcessAction.java | 21 +++- .../core/ml/job/config/AnalysisConfig.java | 86 ++++++++++++-- .../xpack/core/ml/job/config/JobUpdate.java | 56 ++++++++-- .../PerPartitionCategorizationConfig.java | 105 ++++++++++++++++++ .../core/ml/job/results/AnomalyRecord.java | 7 +- .../ml/job/results/CategoryDefinition.java | 47 ++++++++ .../ml/job/results/ReservedFieldNames.java | 4 + .../xpack/core/ml/config_index_mappings.json | 10 ++ .../ml/action/GetCategoriesRequestTests.java | 3 + .../action/UpdateJobActionRequestTests.java | 3 +- .../UpdateProcessActionRequestTests.java | 12 +- .../ml/job/config/AnalysisConfigTests.java | 87 ++++++++++++++- .../core/ml/job/config/JobUpdateTests.java | 18 +++ ...PerPartitionCategorizationConfigTests.java | 45 ++++++++ .../AutodetectResultProcessorIT.java | 9 +- .../action/TransportGetCategoriesAction.java | 4 +- .../action/TransportUpdateProcessAction.java | 1 + .../ml/job/UpdateJobProcessNotifier.java | 4 +- .../job/persistence/JobResultsProvider.java | 23 ++-- .../autodetect/AutodetectCommunicator.java | 4 + .../process/autodetect/AutodetectProcess.java | 9 ++ .../BlackHoleAutodetectProcess.java | 5 + .../autodetect/NativeAutodetectProcess.java | 7 ++ .../job/process/autodetect/UpdateParams.java | 24 +++- .../autodetect/UpdateProcessMessage.java | 20 +++- .../autodetect/writer/FieldConfigWriter.java | 11 +- .../rest/results/RestGetCategoriesAction.java | 1 + .../persistence/JobResultsProviderTests.java | 99 ++++++++--------- .../process/autodetect/UpdateParamsTests.java | 14 ++- .../writer/FieldConfigWriterTests.java | 19 ++++ .../job/results/CategoryDefinitionTests.java | 20 +++- .../rest-api-spec/api/ml.get_categories.json | 4 + .../test/ml/jobs_get_result_categories.yml | 58 +++++++++- 47 files changed, 1079 insertions(+), 148 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/PerPartitionCategorizationConfig.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/PerPartitionCategorizationConfigTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfig.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfigTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/GetCategoriesRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/GetCategoriesRequest.java index b1000c3e4eb..aca18b61a35 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/GetCategoriesRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/GetCategoriesRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.client.core.PageParams; import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.client.ml.job.results.CategoryDefinition; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -35,8 +36,8 @@ import java.util.Objects; */ public class GetCategoriesRequest extends ActionRequest implements ToXContentObject { - - public static final ParseField CATEGORY_ID = new ParseField("category_id"); + public static final ParseField CATEGORY_ID = CategoryDefinition.CATEGORY_ID; + public static final ParseField PARTITION_FIELD_VALUE = CategoryDefinition.PARTITION_FIELD_VALUE; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "get_categories_request", a -> new GetCategoriesRequest((String) a[0])); @@ -46,11 +47,13 @@ public class GetCategoriesRequest extends ActionRequest implements ToXContentObj PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); PARSER.declareLong(GetCategoriesRequest::setCategoryId, CATEGORY_ID); PARSER.declareObject(GetCategoriesRequest::setPageParams, PageParams.PARSER, PageParams.PAGE); + PARSER.declareString(GetCategoriesRequest::setPartitionFieldValue, PARTITION_FIELD_VALUE); } private final String jobId; private Long categoryId; private PageParams pageParams; + private String partitionFieldValue; /** * Constructs a request to retrieve category information from a given job @@ -88,6 +91,18 @@ public class GetCategoriesRequest extends ActionRequest implements ToXContentObj this.pageParams = pageParams; } + public String getPartitionFieldValue() { + return partitionFieldValue; + } + + /** + * Sets the partition field value + * @param partitionFieldValue the partition field value + */ + public void setPartitionFieldValue(String partitionFieldValue) { + this.partitionFieldValue = partitionFieldValue; + } + @Override public ActionRequestValidationException validate() { return null; @@ -103,6 +118,9 @@ public class GetCategoriesRequest extends ActionRequest implements ToXContentObj if (pageParams != null) { builder.field(PageParams.PAGE.getPreferredName(), pageParams); } + if (partitionFieldValue != null) { + builder.field(PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); + } builder.endObject(); return builder; } @@ -118,11 +136,12 @@ public class GetCategoriesRequest extends ActionRequest implements ToXContentObj GetCategoriesRequest request = (GetCategoriesRequest) obj; return Objects.equals(jobId, request.jobId) && Objects.equals(categoryId, request.categoryId) - && Objects.equals(pageParams, request.pageParams); + && Objects.equals(pageParams, request.pageParams) + && Objects.equals(partitionFieldValue, request.partitionFieldValue); } @Override public int hashCode() { - return Objects.hash(jobId, categoryId, pageParams); + return Objects.hash(jobId, categoryId, pageParams, partitionFieldValue); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/AnalysisConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/AnalysisConfig.java index f3049be2b5e..4bbe819cffd 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/AnalysisConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/AnalysisConfig.java @@ -56,6 +56,7 @@ public class AnalysisConfig implements ToXContentObject { public static final ParseField CATEGORIZATION_FIELD_NAME = new ParseField("categorization_field_name"); public static final ParseField CATEGORIZATION_FILTERS = new ParseField("categorization_filters"); public static final ParseField CATEGORIZATION_ANALYZER = CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER; + public static final ParseField PER_PARTITION_CATEGORIZATION = new ParseField("per_partition_categorization"); public static final ParseField LATENCY = new ParseField("latency"); public static final ParseField SUMMARY_COUNT_FIELD_NAME = new ParseField("summary_count_field_name"); public static final ParseField DETECTORS = new ParseField("detectors"); @@ -78,6 +79,8 @@ public class AnalysisConfig implements ToXContentObject { PARSER.declareField(Builder::setCategorizationAnalyzerConfig, (p, c) -> CategorizationAnalyzerConfig.buildFromXContentFragment(p), CATEGORIZATION_ANALYZER, ObjectParser.ValueType.OBJECT_OR_STRING); + PARSER.declareObject(Builder::setPerPartitionCategorizationConfig, PerPartitionCategorizationConfig.PARSER, + PER_PARTITION_CATEGORIZATION); PARSER.declareString((builder, val) -> builder.setLatency(TimeValue.parseTimeValue(val, LATENCY.getPreferredName())), LATENCY); PARSER.declareString(Builder::setSummaryCountFieldName, SUMMARY_COUNT_FIELD_NAME); @@ -92,6 +95,7 @@ public class AnalysisConfig implements ToXContentObject { private final String categorizationFieldName; private final List categorizationFilters; private final CategorizationAnalyzerConfig categorizationAnalyzerConfig; + private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; private final TimeValue latency; private final String summaryCountFieldName; private final List detectors; @@ -99,13 +103,15 @@ public class AnalysisConfig implements ToXContentObject { private final Boolean multivariateByFields; private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List categorizationFilters, - CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName, - List detectors, List influencers, Boolean multivariateByFields) { + CategorizationAnalyzerConfig categorizationAnalyzerConfig, + PerPartitionCategorizationConfig perPartitionCategorizationConfig, TimeValue latency, + String summaryCountFieldName, List detectors, List influencers, Boolean multivariateByFields) { this.detectors = Collections.unmodifiableList(detectors); this.bucketSpan = bucketSpan; this.latency = latency; this.categorizationFieldName = categorizationFieldName; this.categorizationAnalyzerConfig = categorizationAnalyzerConfig; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters); this.summaryCountFieldName = summaryCountFieldName; this.influencers = Collections.unmodifiableList(influencers); @@ -133,6 +139,10 @@ public class AnalysisConfig implements ToXContentObject { return categorizationAnalyzerConfig; } + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + /** * The latency interval during which out-of-order records should be handled. * @@ -226,6 +236,9 @@ public class AnalysisConfig implements ToXContentObject { // gets written as a single string. categorizationAnalyzerConfig.toXContent(builder, params); } + if (perPartitionCategorizationConfig != null) { + builder.field(PER_PARTITION_CATEGORIZATION.getPreferredName(), perPartitionCategorizationConfig); + } if (latency != null) { builder.field(LATENCY.getPreferredName(), latency.getStringRep()); } @@ -261,6 +274,7 @@ public class AnalysisConfig implements ToXContentObject { Objects.equals(categorizationFieldName, that.categorizationFieldName) && Objects.equals(categorizationFilters, that.categorizationFilters) && Objects.equals(categorizationAnalyzerConfig, that.categorizationAnalyzerConfig) && + Objects.equals(perPartitionCategorizationConfig, that.perPartitionCategorizationConfig) && Objects.equals(summaryCountFieldName, that.summaryCountFieldName) && Objects.equals(detectors, that.detectors) && Objects.equals(influencers, that.influencers) && @@ -270,8 +284,8 @@ public class AnalysisConfig implements ToXContentObject { @Override public int hashCode() { return Objects.hash( - bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency, - summaryCountFieldName, detectors, influencers, multivariateByFields); + bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, perPartitionCategorizationConfig, + latency, summaryCountFieldName, detectors, influencers, multivariateByFields); } public static Builder builder(List detectors) { @@ -286,6 +300,7 @@ public class AnalysisConfig implements ToXContentObject { private String categorizationFieldName; private List categorizationFilters; private CategorizationAnalyzerConfig categorizationAnalyzerConfig; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig; private String summaryCountFieldName; private List influencers = new ArrayList<>(); private Boolean multivariateByFields; @@ -302,6 +317,7 @@ public class AnalysisConfig implements ToXContentObject { this.categorizationFilters = analysisConfig.categorizationFilters == null ? null : new ArrayList<>(analysisConfig.categorizationFilters); this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig; + this.perPartitionCategorizationConfig = analysisConfig.perPartitionCategorizationConfig; this.summaryCountFieldName = analysisConfig.summaryCountFieldName; this.influencers = new ArrayList<>(analysisConfig.influencers); this.multivariateByFields = analysisConfig.multivariateByFields; @@ -351,6 +367,11 @@ public class AnalysisConfig implements ToXContentObject { return this; } + public Builder setPerPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; + return this; + } + public Builder setSummaryCountFieldName(String summaryCountFieldName) { this.summaryCountFieldName = summaryCountFieldName; return this; @@ -369,7 +390,7 @@ public class AnalysisConfig implements ToXContentObject { public AnalysisConfig build() { return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, - latency, summaryCountFieldName, detectors, influencers, multivariateByFields); + perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java index c68ba523bd5..4d662468f6d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java @@ -54,6 +54,8 @@ public class JobUpdate implements ToXContentObject { PARSER.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS); PARSER.declareLong(Builder::setDailyModelSnapshotRetentionAfterDays, Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS); PARSER.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); + PARSER.declareObject(Builder::setPerPartitionCategorizationConfig, PerPartitionCategorizationConfig.PARSER, + AnalysisConfig.PER_PARTITION_CATEGORIZATION); PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); PARSER.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN); } @@ -70,6 +72,7 @@ public class JobUpdate implements ToXContentObject { private final Long dailyModelSnapshotRetentionAfterDays; private final Long resultsRetentionDays; private final List categorizationFilters; + private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; private final Map customSettings; private final Boolean allowLazyOpen; @@ -79,6 +82,7 @@ public class JobUpdate implements ToXContentObject { @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable Long dailyModelSnapshotRetentionAfterDays, @Nullable List categorizationFilters, + @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, @Nullable Map customSettings, @Nullable Boolean allowLazyOpen) { this.jobId = jobId; this.groups = groups; @@ -92,6 +96,7 @@ public class JobUpdate implements ToXContentObject { this.dailyModelSnapshotRetentionAfterDays = dailyModelSnapshotRetentionAfterDays; this.resultsRetentionDays = resultsRetentionDays; this.categorizationFilters = categorizationFilters; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.customSettings = customSettings; this.allowLazyOpen = allowLazyOpen; } @@ -140,6 +145,10 @@ public class JobUpdate implements ToXContentObject { return categorizationFilters; } + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + public Map getCustomSettings() { return customSettings; } @@ -185,6 +194,9 @@ public class JobUpdate implements ToXContentObject { if (categorizationFilters != null) { builder.field(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName(), categorizationFilters); } + if (perPartitionCategorizationConfig != null) { + builder.field(AnalysisConfig.PER_PARTITION_CATEGORIZATION.getPreferredName(), perPartitionCategorizationConfig); + } if (customSettings != null) { builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings); } @@ -219,6 +231,7 @@ public class JobUpdate implements ToXContentObject { && Objects.equals(this.dailyModelSnapshotRetentionAfterDays, that.dailyModelSnapshotRetentionAfterDays) && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.categorizationFilters, that.categorizationFilters) + && Objects.equals(this.perPartitionCategorizationConfig, that.perPartitionCategorizationConfig) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.allowLazyOpen, that.allowLazyOpen); } @@ -227,7 +240,7 @@ public class JobUpdate implements ToXContentObject { public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, - categorizationFilters, customSettings, allowLazyOpen); + categorizationFilters, perPartitionCategorizationConfig, customSettings, allowLazyOpen); } public static class DetectorUpdate implements ToXContentObject { @@ -323,6 +336,7 @@ public class JobUpdate implements ToXContentObject { private Long dailyModelSnapshotRetentionAfterDays; private Long resultsRetentionDays; private List categorizationFilters; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig; private Map customSettings; private Boolean allowLazyOpen; @@ -468,6 +482,19 @@ public class JobUpdate implements ToXContentObject { return this; } + /** + * Sets the per-partition categorization options on the {@link Job} + * + * Updates the {@link AnalysisConfig#perPartitionCategorizationConfig} setting. + * Requires {@link AnalysisConfig#perPartitionCategorizationConfig} to have been set on the existing Job. + * + * @param perPartitionCategorizationConfig per-partition categorization options for the Job's {@link AnalysisConfig} + */ + public Builder setPerPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; + return this; + } + /** * Contains custom meta data about the job. * @@ -488,7 +515,7 @@ public class JobUpdate implements ToXContentObject { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, - categorizationFilters, customSettings, allowLazyOpen); + categorizationFilters, perPartitionCategorizationConfig, customSettings, allowLazyOpen); } } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/PerPartitionCategorizationConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/PerPartitionCategorizationConfig.java new file mode 100644 index 00000000000..52aeb4a4de5 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/PerPartitionCategorizationConfig.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ml.job.config; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class PerPartitionCategorizationConfig implements ToXContentObject { + + public static final ParseField TYPE_FIELD = new ParseField("per_partition_categorization"); + public static final ParseField ENABLED_FIELD = new ParseField("enabled"); + public static final ParseField STOP_ON_WARN = new ParseField("stop_on_warn"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(TYPE_FIELD.getPreferredName(), true, + a -> new PerPartitionCategorizationConfig((boolean) a[0], (Boolean) a[1])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), STOP_ON_WARN); + } + + private final boolean enabled; + private final boolean stopOnWarn; + + public PerPartitionCategorizationConfig() { + this(false, null); + } + + public PerPartitionCategorizationConfig(boolean enabled, Boolean stopOnWarn) { + this.enabled = enabled; + this.stopOnWarn = (stopOnWarn == null) ? false : stopOnWarn; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED_FIELD.getPreferredName(), enabled); + if (enabled) { + builder.field(STOP_ON_WARN.getPreferredName(), stopOnWarn); + } + builder.endObject(); + return builder; + } + + public boolean isEnabled() { + return enabled; + } + + public boolean isStopOnWarn() { + return stopOnWarn; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof PerPartitionCategorizationConfig == false) { + return false; + } + + PerPartitionCategorizationConfig that = (PerPartitionCategorizationConfig) other; + return this.enabled == that.enabled && this.stopOnWarn == that.stopOnWarn; + } + + @Override + public int hashCode() { + return Objects.hash(enabled, stopOnWarn); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/results/CategoryDefinition.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/results/CategoryDefinition.java index e34d96153f8..cb441e3e100 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/results/CategoryDefinition.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/results/CategoryDefinition.java @@ -38,6 +38,8 @@ public class CategoryDefinition implements ToXContentObject { public static final ParseField TYPE = new ParseField("category_definition"); public static final ParseField CATEGORY_ID = new ParseField("category_id"); + public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name"); + public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value"); public static final ParseField TERMS = new ParseField("terms"); public static final ParseField REGEX = new ParseField("regex"); public static final ParseField MAX_MATCHING_LENGTH = new ParseField("max_matching_length"); @@ -55,6 +57,8 @@ public class CategoryDefinition implements ToXContentObject { static { PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); PARSER.declareLong(CategoryDefinition::setCategoryId, CATEGORY_ID); + PARSER.declareString(CategoryDefinition::setPartitionFieldName, PARTITION_FIELD_NAME); + PARSER.declareString(CategoryDefinition::setPartitionFieldValue, PARTITION_FIELD_VALUE); PARSER.declareString(CategoryDefinition::setTerms, TERMS); PARSER.declareString(CategoryDefinition::setRegex, REGEX); PARSER.declareLong(CategoryDefinition::setMaxMatchingLength, MAX_MATCHING_LENGTH); @@ -66,6 +70,8 @@ public class CategoryDefinition implements ToXContentObject { private final String jobId; private long categoryId = 0L; + private String partitionFieldName; + private String partitionFieldValue; private String terms = ""; private String regex = ""; private long maxMatchingLength = 0L; @@ -90,6 +96,22 @@ public class CategoryDefinition implements ToXContentObject { this.categoryId = categoryId; } + public String getPartitionFieldName() { + return partitionFieldName; + } + + public void setPartitionFieldName(String partitionFieldName) { + this.partitionFieldName = partitionFieldName; + } + + public String getPartitionFieldValue() { + return partitionFieldValue; + } + + public void setPartitionFieldValue(String partitionFieldValue) { + this.partitionFieldValue = partitionFieldValue; + } + public String getTerms() { return terms; } @@ -156,6 +178,12 @@ public class CategoryDefinition implements ToXContentObject { builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(CATEGORY_ID.getPreferredName(), categoryId); + if (partitionFieldName != null) { + builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName); + } + if (partitionFieldValue != null) { + builder.field(PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); + } builder.field(TERMS.getPreferredName(), terms); builder.field(REGEX.getPreferredName(), regex); builder.field(MAX_MATCHING_LENGTH.getPreferredName(), maxMatchingLength); @@ -182,6 +210,8 @@ public class CategoryDefinition implements ToXContentObject { CategoryDefinition that = (CategoryDefinition) other; return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.categoryId, that.categoryId) + && Objects.equals(this.partitionFieldName, that.partitionFieldName) + && Objects.equals(this.partitionFieldValue, that.partitionFieldValue) && Objects.equals(this.terms, that.terms) && Objects.equals(this.regex, that.regex) && Objects.equals(this.maxMatchingLength, that.maxMatchingLength) @@ -193,6 +223,7 @@ public class CategoryDefinition implements ToXContentObject { @Override public int hashCode() { - return Objects.hash(jobId, categoryId, terms, regex, maxMatchingLength, examples, preferredToCategories, numMatches, grokPattern); + return Objects.hash(jobId, categoryId, partitionFieldName, partitionFieldValue, terms, regex, maxMatchingLength, examples, + preferredToCategories, numMatches, grokPattern); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/GetCategoriesRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/GetCategoriesRequestTests.java index bcc697fdb20..063516a9982 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/GetCategoriesRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/GetCategoriesRequestTests.java @@ -36,6 +36,9 @@ public class GetCategoriesRequestTests extends AbstractXContentTestCase { + + @Override + protected PerPartitionCategorizationConfig createTestInstance() { + boolean enabled = randomBoolean(); + return new PerPartitionCategorizationConfig(enabled, randomBoolean() ? null : enabled && randomBoolean()); + } + + @Override + protected PerPartitionCategorizationConfig doParseInstance(XContentParser parser) { + return PerPartitionCategorizationConfig.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/results/CategoryDefinitionTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/results/CategoryDefinitionTests.java index 607fcd843fb..65ab8d8a4cc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/results/CategoryDefinitionTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/results/CategoryDefinitionTests.java @@ -31,6 +31,10 @@ public class CategoryDefinitionTests extends AbstractXContentTestCase`:: -(Optional, long) Identifier for the category. If you do not specify this -parameter, the API returns information about all categories in the {anomaly-job}. +(Optional, long) Identifier for the category, which is unique in the job. If you +specify neither the category ID nor the `partition_field_value`, the API returns +information about all categories. If you specify only the +`partition_field_value`, it returns information about all categories for the +specified partition. ``:: (Required, string) @@ -59,6 +62,9 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection] `page`.`size`:: (Optional, integer) Specifies the maximum number of categories to obtain. +`partition_field_value`:: +(Optional, string) Only return categories for the specified partition. + [[ml-get-category-results]] ==== {api-response-body-title} @@ -66,7 +72,9 @@ The API returns an array of category objects, which have the following properties: `category_id`:: -(unsigned integer) A unique identifier for the category. +(unsigned integer) A unique identifier for the category. `category_id` is unique +at the job level, even when per-partition categorization is enabled. + `examples`:: (array) A list of examples of actual values that matched the category. @@ -87,6 +95,18 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection] The value is increased by 10% to enable matching for similar fields that have not been analyzed. +// This doesn't use the shared description because there are +// categorization-specific aspects to its use in this context +`partition_field_name`:: +(string) If per-partition categorization is enabled, this property identifies +the field used to segment the categorization. It is not present when +per-partition categorization is disabled. + +`partition_field_value`:: +(string) If per-partition categorization is enabled, this property identifies +the value of the `partition_field_name` for the category. It is not present when +per-partition categorization is disabled. + `regex`:: (string) A regular expression that is used to search for values that match the category. diff --git a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc index b694b2c7f7d..a55e7f5d23a 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-job.asciidoc @@ -186,6 +186,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=latency] (boolean) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=multivariate-by-fields] +//Begin analysis_config.per_partition_categorization +`per_partition_categorization`::: +(Optional, object) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization] ++ +.Properties of `per_partition_categorization` +[%collapsible%open] +===== +`enabled`:::: +(boolean) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization-enabled] + +`stop_on_warn`:::: +(boolean) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization-stop-on-warn] +===== +//End analysis_config.per_partition_categorization + `summary_count_field_name`::: (string) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=summary-count-field-name] diff --git a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc index 52ab1481268..2722defd210 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-job.asciidoc @@ -198,6 +198,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-plot-config-terms] (long) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-snapshot-retention-days] +//Begin per_partition_categorization +`per_partition_categorization`::: +(object) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization] ++ +.Properties of `per_partition_categorization` +[%collapsible%open] +==== +`enabled`::: +(boolean) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization-enabled] + +`stop_on_warn`::: +(boolean) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=per-partition-categorization-stop-on-warn] +==== +//End per_partition_categorization + `renormalization_window_days`:: (long) include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=renormalization-window-days] diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 882561ee691..22a0c83930c 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -1108,6 +1108,25 @@ The field used to segment the analysis. When you use this property, you have completely independent baselines for each value of this field. end::partition-field-name[] +tag::per-partition-categorization[] +Settings related to how categorization interacts with partition fields. +end::per-partition-categorization[] + +tag::per-partition-categorization-enabled[] +To enable this setting, you must also set the partition_field_name property to +the same value in every detector that uses the keyword mlcategory. Otherwise, +job creation fails. +end::per-partition-categorization-enabled[] + +tag::per-partition-categorization-stop-on-warn[] +This setting can be set to true only if per-partition categorization is enabled. +If true, both categorization and subsequent anomaly detection stops for +partitions where the categorization status changes to `warn`. This setting makes +it viable to have a job where it is expected that categorization works well for +some partitions but not others; you do not pay the cost of bad categorization +forever in the partitions where it works badly. +end::per-partition-categorization-stop-on-warn[] + tag::prediction-field-name[] Defines the name of the prediction field in the results. Defaults to `_prediction`. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java index e37cbd77bc6..1c8e796754f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; @@ -40,9 +41,10 @@ public class GetCategoriesAction extends ActionType PARSER = new ObjectParser<>(NAME, Request::new); @@ -50,6 +52,7 @@ public class GetCategoriesAction extends ActionType request.jobId = jobId, Job.ID); PARSER.declareLong(Request::setCategoryId, CATEGORY_ID); PARSER.declareObject(Request::setPageParams, PageParams.PARSER, PageParams.PAGE); + PARSER.declareString(Request::setPartitionFieldValue, PARTITION_FIELD_VALUE); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -63,6 +66,7 @@ public class GetCategoriesAction extends ActionType { private ModelPlotConfig modelPlotConfig; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig; private List detectorUpdates; private MlFilter filter; private boolean updateScheduledEvents = false; @@ -108,6 +110,9 @@ public class UpdateProcessAction extends ActionType detectorUpdates, MlFilter filter, - boolean updateScheduledEvents) { + public Request(String jobId, ModelPlotConfig modelPlotConfig, PerPartitionCategorizationConfig perPartitionCategorizationConfig, + List detectorUpdates, MlFilter filter, boolean updateScheduledEvents) { super(jobId); this.modelPlotConfig = modelPlotConfig; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.detectorUpdates = detectorUpdates; this.filter = filter; this.updateScheduledEvents = updateScheduledEvents; @@ -145,6 +154,10 @@ public class UpdateProcessAction extends ActionType getDetectorUpdates() { return detectorUpdates; } @@ -159,7 +172,8 @@ public class UpdateProcessAction extends ActionType CategorizationAnalyzerConfig.buildFromXContentFragment(p, ignoreUnknownFields), CATEGORIZATION_ANALYZER, ObjectParser.ValueType.OBJECT_OR_STRING); + parser.declareObject(Builder::setPerPartitionCategorizationConfig, + ignoreUnknownFields ? PerPartitionCategorizationConfig.LENIENT_PARSER : PerPartitionCategorizationConfig.STRICT_PARSER, + PER_PARTITION_CATEGORIZATION); parser.declareString((builder, val) -> builder.setLatency(TimeValue.parseTimeValue(val, LATENCY.getPreferredName())), LATENCY); parser.declareString(Builder::setSummaryCountFieldName, SUMMARY_COUNT_FIELD_NAME); @@ -102,6 +107,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable { private final String categorizationFieldName; private final List categorizationFilters; private final CategorizationAnalyzerConfig categorizationAnalyzerConfig; + private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; private final TimeValue latency; private final String summaryCountFieldName; private final List detectors; @@ -109,14 +115,16 @@ public class AnalysisConfig implements ToXContentObject, Writeable { private final Boolean multivariateByFields; private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List categorizationFilters, - CategorizationAnalyzerConfig categorizationAnalyzerConfig, TimeValue latency, String summaryCountFieldName, - List detectors, List influencers, Boolean multivariateByFields) { + CategorizationAnalyzerConfig categorizationAnalyzerConfig, + PerPartitionCategorizationConfig perPartitionCategorizationConfig, TimeValue latency, + String summaryCountFieldName, List detectors, List influencers, Boolean multivariateByFields) { this.detectors = detectors; this.bucketSpan = bucketSpan; this.latency = latency; this.categorizationFieldName = categorizationFieldName; this.categorizationAnalyzerConfig = categorizationAnalyzerConfig; this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters); + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.summaryCountFieldName = summaryCountFieldName; this.influencers = Collections.unmodifiableList(influencers); this.multivariateByFields = multivariateByFields; @@ -126,10 +134,11 @@ public class AnalysisConfig implements ToXContentObject, Writeable { bucketSpan = in.readTimeValue(); categorizationFieldName = in.readOptionalString(); categorizationFilters = in.readBoolean() ? Collections.unmodifiableList(in.readStringList()) : null; - if (in.getVersion().onOrAfter(Version.V_6_2_0)) { - categorizationAnalyzerConfig = in.readOptionalWriteable(CategorizationAnalyzerConfig::new); + categorizationAnalyzerConfig = in.readOptionalWriteable(CategorizationAnalyzerConfig::new); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + perPartitionCategorizationConfig = new PerPartitionCategorizationConfig(in); } else { - categorizationAnalyzerConfig = null; + perPartitionCategorizationConfig = new PerPartitionCategorizationConfig(); } latency = in.readOptionalTimeValue(); summaryCountFieldName = in.readOptionalString(); @@ -149,8 +158,9 @@ public class AnalysisConfig implements ToXContentObject, Writeable { } else { out.writeBoolean(false); } - if (out.getVersion().onOrAfter(Version.V_6_2_0)) { - out.writeOptionalWriteable(categorizationAnalyzerConfig); + out.writeOptionalWriteable(categorizationAnalyzerConfig); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + perPartitionCategorizationConfig.writeTo(out); } out.writeOptionalTimeValue(latency); out.writeOptionalString(summaryCountFieldName); @@ -181,6 +191,10 @@ public class AnalysisConfig implements ToXContentObject, Writeable { return categorizationAnalyzerConfig; } + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + /** * The latency interval during which out-of-order records should be handled. * @@ -332,6 +346,11 @@ public class AnalysisConfig implements ToXContentObject, Writeable { // gets written as a single string. categorizationAnalyzerConfig.toXContent(builder, params); } + // perPartitionCategorizationConfig is never null on the server side (it can be in the equivalent client class), + // but is not useful to know when categorization is not being used + if (categorizationFieldName != null) { + builder.field(PER_PARTITION_CATEGORIZATION.getPreferredName(), perPartitionCategorizationConfig); + } if (latency != null) { builder.field(LATENCY.getPreferredName(), latency.getStringRep()); } @@ -361,6 +380,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable { Objects.equals(categorizationFieldName, that.categorizationFieldName) && Objects.equals(categorizationFilters, that.categorizationFilters) && Objects.equals(categorizationAnalyzerConfig, that.categorizationAnalyzerConfig) && + Objects.equals(perPartitionCategorizationConfig, that.perPartitionCategorizationConfig) && Objects.equals(summaryCountFieldName, that.summaryCountFieldName) && Objects.equals(detectors, that.detectors) && Objects.equals(influencers, that.influencers) && @@ -370,8 +390,8 @@ public class AnalysisConfig implements ToXContentObject, Writeable { @Override public int hashCode() { return Objects.hash( - bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, latency, - summaryCountFieldName, detectors, influencers, multivariateByFields); + bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, perPartitionCategorizationConfig, + latency, summaryCountFieldName, detectors, influencers, multivariateByFields); } public static class Builder { @@ -384,6 +404,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable { private String categorizationFieldName; private List categorizationFilters; private CategorizationAnalyzerConfig categorizationAnalyzerConfig; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig = new PerPartitionCategorizationConfig(); private String summaryCountFieldName; private List influencers = new ArrayList<>(); private Boolean multivariateByFields; @@ -400,6 +421,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable { this.categorizationFilters = analysisConfig.categorizationFilters == null ? null : new ArrayList<>(analysisConfig.categorizationFilters); this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig; + this.perPartitionCategorizationConfig = analysisConfig.perPartitionCategorizationConfig; this.summaryCountFieldName = analysisConfig.summaryCountFieldName; this.influencers = new ArrayList<>(analysisConfig.influencers); this.multivariateByFields = analysisConfig.multivariateByFields; @@ -452,6 +474,12 @@ public class AnalysisConfig implements ToXContentObject, Writeable { return this; } + public Builder setPerPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = + ExceptionsHelper.requireNonNull(perPartitionCategorizationConfig, PER_PARTITION_CATEGORIZATION.getPreferredName()); + return this; + } + public Builder setSummaryCountFieldName(String summaryCountFieldName) { this.summaryCountFieldName = summaryCountFieldName; return this; @@ -492,13 +520,51 @@ public class AnalysisConfig implements ToXContentObject, Writeable { verifyMlCategoryIsUsedWhenCategorizationFieldNameIsSet(); verifyCategorizationAnalyzer(); verifyCategorizationFilters(); + verifyConfigConsistentWithPerPartitionCategorization(); verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet(); verifyNoInconsistentNestedFieldNames(); return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, - latency, summaryCountFieldName, detectors, influencers, multivariateByFields); + perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields); + } + + private void verifyConfigConsistentWithPerPartitionCategorization() { + if (perPartitionCategorizationConfig.isEnabled() == false) { + return; + } + + if (categorizationFieldName == null) { + throw ExceptionsHelper.badRequestException(CATEGORIZATION_FIELD_NAME.getPreferredName() + + " must be set when per-partition categorization is enabled"); + } + + AtomicReference singlePartitionFieldName = new AtomicReference<>(); + detectors.forEach(d -> { + String thisDetectorPartitionFieldName = d.getPartitionFieldName(); + if (d.getByOverPartitionTerms().contains(ML_CATEGORY_FIELD)) { + if (ML_CATEGORY_FIELD.equals(d.getPartitionFieldName())) { + throw ExceptionsHelper.badRequestException(ML_CATEGORY_FIELD + " cannot be used as a " + + Detector.PARTITION_FIELD_NAME_FIELD.getPreferredName() + + " when per-partition categorization is enabled"); + } + if (thisDetectorPartitionFieldName == null) { + throw ExceptionsHelper.badRequestException(Detector.PARTITION_FIELD_NAME_FIELD.getPreferredName() + + " must be set for detectors that reference " + ML_CATEGORY_FIELD + + " when per-partition categorization is enabled"); + } + } + if (thisDetectorPartitionFieldName != null) { + String previousPartitionFieldName = singlePartitionFieldName.getAndSet(thisDetectorPartitionFieldName); + if (previousPartitionFieldName != null && + previousPartitionFieldName.equals(thisDetectorPartitionFieldName) == false) { + throw ExceptionsHelper.badRequestException(Detector.PARTITION_FIELD_NAME_FIELD.getPreferredName() + + " cannot vary between detectors when per-partition categorization is enabled: [" + + previousPartitionFieldName + "] and [" + thisDetectorPartitionFieldName + "] are used"); + } + } + }); } private void verifyNoMetricFunctionsWhenSummaryCountFieldNameIsSet() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index e2e384f7465..148591faf51 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -54,6 +54,8 @@ public class JobUpdate implements Writeable, ToXContentObject { parser.declareLong(Builder::setModelSnapshotRetentionDays, Job.MODEL_SNAPSHOT_RETENTION_DAYS); parser.declareLong(Builder::setDailyModelSnapshotRetentionAfterDays, Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS); parser.declareStringArray(Builder::setCategorizationFilters, AnalysisConfig.CATEGORIZATION_FILTERS); + parser.declareObject(Builder::setPerPartitionCategorizationConfig, PerPartitionCategorizationConfig.STRICT_PARSER, + AnalysisConfig.PER_PARTITION_CATEGORIZATION); parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT); parser.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN); } @@ -76,6 +78,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private final Long dailyModelSnapshotRetentionAfterDays; private final Long resultsRetentionDays; private final List categorizationFilters; + private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; private final Map customSettings; private final String modelSnapshotId; private final Version modelSnapshotMinVersion; @@ -89,6 +92,7 @@ public class JobUpdate implements Writeable, ToXContentObject { @Nullable Long renormalizationWindowDays, @Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays, @Nullable Long dailyModelSnapshotRetentionAfterDays, @Nullable List categorizationFilters, + @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, @Nullable Map customSettings, @Nullable String modelSnapshotId, @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime, @Nullable Boolean allowLazyOpen) { @@ -104,6 +108,7 @@ public class JobUpdate implements Writeable, ToXContentObject { this.dailyModelSnapshotRetentionAfterDays = dailyModelSnapshotRetentionAfterDays; this.resultsRetentionDays = resultsRetentionDays; this.categorizationFilters = categorizationFilters; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.customSettings = customSettings; this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; @@ -142,6 +147,11 @@ public class JobUpdate implements Writeable, ToXContentObject { } else { categorizationFilters = null; } + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + perPartitionCategorizationConfig = in.readOptionalWriteable(PerPartitionCategorizationConfig::new); + } else { + perPartitionCategorizationConfig = null; + } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); // was establishedModelMemory @@ -173,10 +183,8 @@ public class JobUpdate implements Writeable, ToXContentObject { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); - if (out.getVersion().onOrAfter(Version.V_6_1_0)) { - String[] groupsArray = groups == null ? null : groups.toArray(new String[groups.size()]); - out.writeOptionalStringArray(groupsArray); - } + String[] groupsArray = groups == null ? null : groups.toArray(new String[0]); + out.writeOptionalStringArray(groupsArray); out.writeOptionalString(description); out.writeBoolean(detectorUpdates != null); if (detectorUpdates != null) { @@ -195,6 +203,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (categorizationFilters != null) { out.writeStringCollection(categorizationFilters); } + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalWriteable(perPartitionCategorizationConfig); + } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); // was establishedModelMemory @@ -273,6 +284,10 @@ public class JobUpdate implements Writeable, ToXContentObject { return categorizationFilters; } + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + public Map getCustomSettings() { return customSettings; } @@ -298,7 +313,7 @@ public class JobUpdate implements Writeable, ToXContentObject { } public boolean isAutodetectProcessUpdate() { - return modelPlotConfig != null || detectorUpdates != null || groups != null; + return modelPlotConfig != null || perPartitionCategorizationConfig != null || detectorUpdates != null || groups != null; } @Override @@ -338,6 +353,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (categorizationFilters != null) { builder.field(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName(), categorizationFilters); } + if (perPartitionCategorizationConfig != null) { + builder.field(AnalysisConfig.PER_PARTITION_CATEGORIZATION.getPreferredName(), perPartitionCategorizationConfig); + } if (customSettings != null) { builder.field(Job.CUSTOM_SETTINGS.getPreferredName(), customSettings); } @@ -395,6 +413,9 @@ public class JobUpdate implements Writeable, ToXContentObject { if (categorizationFilters != null) { updateFields.add(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName()); } + if (perPartitionCategorizationConfig != null) { + updateFields.add(AnalysisConfig.PER_PARTITION_CATEGORIZATION.getPreferredName()); + } if (customSettings != null) { updateFields.add(Job.CUSTOM_SETTINGS.getPreferredName()); } @@ -476,6 +497,14 @@ public class JobUpdate implements Writeable, ToXContentObject { if (categorizationFilters != null) { newAnalysisConfig.setCategorizationFilters(categorizationFilters); } + if (perPartitionCategorizationConfig != null) { + // Whether per-partition categorization is enabled cannot be changed, only the lower level details + if (perPartitionCategorizationConfig.isEnabled() != + currentAnalysisConfig.getPerPartitionCategorizationConfig().isEnabled()) { + throw ExceptionsHelper.badRequestException("analysis_config.per_partition_categorization.enabled cannot be updated"); + } + newAnalysisConfig.setPerPartitionCategorizationConfig(perPartitionCategorizationConfig); + } if (customSettings != null) { builder.setCustomSettings(customSettings); } @@ -513,6 +542,8 @@ public class JobUpdate implements Writeable, ToXContentObject { && (resultsRetentionDays == null || Objects.equals(resultsRetentionDays, job.getResultsRetentionDays())) && (categorizationFilters == null || Objects.equals(categorizationFilters, job.getAnalysisConfig().getCategorizationFilters())) + && (perPartitionCategorizationConfig == null + || Objects.equals(perPartitionCategorizationConfig, job.getAnalysisConfig().getPerPartitionCategorizationConfig())) && (customSettings == null || Objects.equals(customSettings, job.getCustomSettings())) && (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId())) && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion())) @@ -563,6 +594,7 @@ public class JobUpdate implements Writeable, ToXContentObject { && Objects.equals(this.dailyModelSnapshotRetentionAfterDays, that.dailyModelSnapshotRetentionAfterDays) && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.categorizationFilters, that.categorizationFilters) + && Objects.equals(this.perPartitionCategorizationConfig, that.perPartitionCategorizationConfig) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) @@ -575,8 +607,8 @@ public class JobUpdate implements Writeable, ToXContentObject { public int hashCode() { return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays, - categorizationFilters, customSettings, modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, - allowLazyOpen); + categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion, + jobVersion, clearJobFinishTime, allowLazyOpen); } public static class DetectorUpdate implements Writeable, ToXContentObject { @@ -684,6 +716,7 @@ public class JobUpdate implements Writeable, ToXContentObject { private Long dailyModelSnapshotRetentionAfterDays; private Long resultsRetentionDays; private List categorizationFilters; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig; private Map customSettings; private String modelSnapshotId; private Version modelSnapshotMinVersion; @@ -755,6 +788,11 @@ public class JobUpdate implements Writeable, ToXContentObject { return this; } + public Builder setPerPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; + return this; + } + public Builder setCustomSettings(Map customSettings) { this.customSettings = customSettings; return this; @@ -798,8 +836,8 @@ public class JobUpdate implements Writeable, ToXContentObject { public JobUpdate build() { return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval, renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, - categorizationFilters, customSettings, modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, - allowLazyOpen); + categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion, + jobVersion, clearJobFinishTime, allowLazyOpen); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfig.java new file mode 100644 index 00000000000..0dd8959e8f1 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfig.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.job.config; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Objects; + +public class PerPartitionCategorizationConfig implements ToXContentObject, Writeable { + + public static final ParseField TYPE_FIELD = new ParseField("per_partition_categorization"); + public static final ParseField ENABLED_FIELD = new ParseField("enabled"); + public static final ParseField STOP_ON_WARN = new ParseField("stop_on_warn"); + + // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly + public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + public static final ConstructingObjectParser STRICT_PARSER = createParser(false); + + private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { + ConstructingObjectParser parser = + new ConstructingObjectParser<>(TYPE_FIELD.getPreferredName(), ignoreUnknownFields, + a -> new PerPartitionCategorizationConfig((boolean) a[0], (Boolean) a[1])); + + parser.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD); + parser.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), STOP_ON_WARN); + + return parser; + } + + private final boolean enabled; + private final boolean stopOnWarn; + + public PerPartitionCategorizationConfig() { + this(false, null); + } + + public PerPartitionCategorizationConfig(boolean enabled, Boolean stopOnWarn) { + this.enabled = enabled; + this.stopOnWarn = (stopOnWarn == null) ? false : stopOnWarn; + if (this.enabled == false && this.stopOnWarn) { + throw ExceptionsHelper.badRequestException(STOP_ON_WARN.getPreferredName() + " cannot be true in " + + TYPE_FIELD.getPreferredName() + " when " + ENABLED_FIELD.getPreferredName() + " is false"); + } + } + + public PerPartitionCategorizationConfig(StreamInput in) throws IOException { + enabled = in.readBoolean(); + stopOnWarn = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(enabled); + out.writeBoolean(stopOnWarn); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED_FIELD.getPreferredName(), enabled); + if (enabled) { + builder.field(STOP_ON_WARN.getPreferredName(), stopOnWarn); + } + builder.endObject(); + return builder; + } + + public boolean isEnabled() { + return enabled; + } + + public boolean isStopOnWarn() { + return stopOnWarn; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof PerPartitionCategorizationConfig == false) { + return false; + } + + PerPartitionCategorizationConfig that = (PerPartitionCategorizationConfig) other; + return this.enabled == that.enabled && this.stopOnWarn == that.stopOnWarn; + } + + @Override + public int hashCode() { + return Objects.hash(enabled, stopOnWarn); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/AnomalyRecord.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/AnomalyRecord.java index ecae60d98a6..af21f9cad81 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/AnomalyRecord.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/AnomalyRecord.java @@ -253,12 +253,6 @@ public class AnomalyRecord implements ToXContentObject, Writeable { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - innerToXContent(builder, params); - builder.endObject(); - return builder; - } - - XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Job.ID.getPreferredName(), jobId); builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); builder.field(PROBABILITY.getPreferredName(), probability); @@ -321,6 +315,7 @@ public class AnomalyRecord implements ToXContentObject, Writeable { for (String fieldName : inputFields.keySet()) { builder.field(fieldName, inputFields.get(fieldName)); } + builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/CategoryDefinition.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/CategoryDefinition.java index 9f036a1197b..a34bf6345c7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/CategoryDefinition.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/CategoryDefinition.java @@ -32,6 +32,8 @@ public class CategoryDefinition implements ToXContentObject, Writeable { public static final ParseField TYPE = new ParseField("category_definition"); public static final ParseField CATEGORY_ID = new ParseField("category_id"); + public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name"); + public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value"); public static final ParseField TERMS = new ParseField("terms"); public static final ParseField REGEX = new ParseField("regex"); public static final ParseField MAX_MATCHING_LENGTH = new ParseField("max_matching_length"); @@ -52,6 +54,8 @@ public class CategoryDefinition implements ToXContentObject, Writeable { parser.declareString(ConstructingObjectParser.constructorArg(), Job.ID); parser.declareLong(CategoryDefinition::setCategoryId, CATEGORY_ID); + parser.declareString(CategoryDefinition::setPartitionFieldName, PARTITION_FIELD_NAME); + parser.declareString(CategoryDefinition::setPartitionFieldValue, PARTITION_FIELD_VALUE); parser.declareString(CategoryDefinition::setTerms, TERMS); parser.declareString(CategoryDefinition::setRegex, REGEX); parser.declareLong(CategoryDefinition::setMaxMatchingLength, MAX_MATCHING_LENGTH); @@ -64,6 +68,8 @@ public class CategoryDefinition implements ToXContentObject, Writeable { private final String jobId; private long categoryId = 0L; + private String partitionFieldName; + private String partitionFieldValue; private String terms = ""; private String regex = ""; private long maxMatchingLength = 0L; @@ -80,6 +86,10 @@ public class CategoryDefinition implements ToXContentObject, Writeable { public CategoryDefinition(StreamInput in) throws IOException { jobId = in.readString(); categoryId = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_7_9_0)) { + partitionFieldName = in.readOptionalString(); + partitionFieldValue = in.readOptionalString(); + } terms = in.readString(); regex = in.readString(); maxMatchingLength = in.readLong(); @@ -97,6 +107,10 @@ public class CategoryDefinition implements ToXContentObject, Writeable { public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); out.writeLong(categoryId); + if (out.getVersion().onOrAfter(Version.V_7_9_0)) { + out.writeOptionalString(partitionFieldName); + out.writeOptionalString(partitionFieldValue); + } out.writeString(terms); out.writeString(regex); out.writeLong(maxMatchingLength); @@ -126,6 +140,22 @@ public class CategoryDefinition implements ToXContentObject, Writeable { this.categoryId = categoryId; } + public String getPartitionFieldName() { + return partitionFieldName; + } + + public void setPartitionFieldName(String partitionFieldName) { + this.partitionFieldName = partitionFieldName; + } + + public String getPartitionFieldValue() { + return partitionFieldValue; + } + + public void setPartitionFieldValue(String partitionFieldValue) { + this.partitionFieldValue = partitionFieldValue; + } + public String getTerms() { return terms; } @@ -204,6 +234,12 @@ public class CategoryDefinition implements ToXContentObject, Writeable { builder.startObject(); builder.field(Job.ID.getPreferredName(), jobId); builder.field(CATEGORY_ID.getPreferredName(), categoryId); + if (partitionFieldName != null) { + builder.field(PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName); + } + if (partitionFieldValue != null) { + builder.field(PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); + } builder.field(TERMS.getPreferredName(), terms); builder.field(REGEX.getPreferredName(), regex); builder.field(MAX_MATCHING_LENGTH.getPreferredName(), maxMatchingLength); @@ -217,6 +253,13 @@ public class CategoryDefinition implements ToXContentObject, Writeable { if (numMatches > 0) { builder.field(NUM_MATCHES.getPreferredName(), numMatches); } + + // Copy the patten from AnomalyRecord that by/over/partition field values are added to results + // as key value pairs after all the fixed fields if they won't clash with reserved fields + if (partitionFieldName != null && partitionFieldValue != null && ReservedFieldNames.isValidFieldName(partitionFieldName)) { + builder.field(partitionFieldName, partitionFieldValue); + } + builder.endObject(); return builder; } @@ -232,6 +275,8 @@ public class CategoryDefinition implements ToXContentObject, Writeable { CategoryDefinition that = (CategoryDefinition) other; return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.categoryId, that.categoryId) + && Objects.equals(this.partitionFieldName, that.partitionFieldName) + && Objects.equals(this.partitionFieldValue, that.partitionFieldValue) && Objects.equals(this.terms, that.terms) && Objects.equals(this.regex, that.regex) && Objects.equals(this.maxMatchingLength, that.maxMatchingLength) @@ -245,6 +290,8 @@ public class CategoryDefinition implements ToXContentObject, Writeable { public int hashCode() { return Objects.hash(jobId, categoryId, + partitionFieldName, + partitionFieldValue, terms, regex, maxMatchingLength, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 1f01129be48..c26bd2c6ce9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.config.Operator; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; @@ -243,6 +244,7 @@ public final class ReservedFieldNames { AnalysisConfig.CATEGORIZATION_FIELD_NAME.getPreferredName(), AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName(), AnalysisConfig.CATEGORIZATION_ANALYZER.getPreferredName(), + AnalysisConfig.PER_PARTITION_CATEGORIZATION.getPreferredName(), AnalysisConfig.LATENCY.getPreferredName(), AnalysisConfig.SUMMARY_COUNT_FIELD_NAME.getPreferredName(), AnalysisConfig.DETECTORS.getPreferredName(), @@ -280,6 +282,8 @@ public final class ReservedFieldNames { ModelPlotConfig.TERMS_FIELD.getPreferredName(), ModelPlotConfig.ANNOTATIONS_ENABLED_FIELD.getPreferredName(), + PerPartitionCategorizationConfig.STOP_ON_WARN.getPreferredName(), + DatafeedConfig.ID.getPreferredName(), DatafeedConfig.QUERY_DELAY.getPreferredName(), DatafeedConfig.FREQUENCY.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index b2f38bf6f4f..4c8cda279db 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -193,6 +193,16 @@ "multivariate_by_fields" : { "type" : "boolean" }, + "per_partition_categorization" : { + "properties" : { + "enabled" : { + "type" : "boolean" + }, + "stop_on_warn" : { + "type" : "boolean" + } + } + }, "summary_count_field_name" : { "type" : "keyword" } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesRequestTests.java index b3d93ed5c9f..a9d787c91d7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetCategoriesRequestTests.java @@ -23,6 +23,9 @@ public class GetCategoriesRequestTests extends AbstractSerializingTestCase { +public class UpdateJobActionRequestTests extends AbstractWireSerializingTestCase { @Override protected UpdateJobAction.Request createTestInstance() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessActionRequestTests.java index c8a331e4081..7a5f46ec6f4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessActionRequestTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.MlFilterTests; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfigTests; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import java.util.ArrayList; import java.util.List; @@ -20,9 +21,13 @@ public class UpdateProcessActionRequestTests extends AbstractWireSerializingTest @Override protected UpdateProcessAction.Request createTestInstance() { - ModelPlotConfig config = null; + ModelPlotConfig modelPlotConfig = null; if (randomBoolean()) { - config = ModelPlotConfigTests.createRandomized(); + modelPlotConfig = ModelPlotConfigTests.createRandomized(); + } + PerPartitionCategorizationConfig perPartitionCategorizationConfig = null; + if (randomBoolean()) { + perPartitionCategorizationConfig = new PerPartitionCategorizationConfig(true, randomBoolean()); } List updates = null; if (randomBoolean()) { @@ -36,7 +41,8 @@ public class UpdateProcessActionRequestTests extends AbstractWireSerializingTest if (randomBoolean()) { filter = MlFilterTests.createTestFilter(); } - return new UpdateProcessAction.Request(randomAlphaOfLength(10), config, updates, filter, randomBoolean()); + return new UpdateProcessAction.Request(randomAlphaOfLength(10), modelPlotConfig, perPartitionCategorizationConfig, updates, + filter, randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java index 0e38be0c04d..303f6b8380e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.TreeSet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class AnalysisConfigTests extends AbstractSerializingTestCase { @@ -39,14 +40,16 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase detectors = new ArrayList<>(); + for (String partitionFieldValue : Arrays.asList("part1", "part2")) { + Detector.Builder detector = new Detector.Builder("count", null); + detector.setByFieldName("mlcategory"); + detector.setPartitionFieldName(partitionFieldValue); + detectors.add(detector.build()); + } + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + analysisConfig.setCategorizationFieldName("msg"); + analysisConfig.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); + + ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, analysisConfig::build); + + assertEquals( + "partition_field_name cannot vary between detectors when per-partition categorization is enabled: [part1] and [part2] are used", + e.getMessage()); + } + + public void testVerify_GivenPerPartitionCategorizationAndNoPartitionFieldOnCategorizationDetector() { + + List detectors = new ArrayList<>(); + Detector.Builder detector = new Detector.Builder("count", null); + detector.setByFieldName("mlcategory"); + detectors.add(detector.build()); + detector = new Detector.Builder("mean", "responsetime"); + detector.setPartitionFieldName("airline"); + detectors.add(detector.build()); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + analysisConfig.setCategorizationFieldName("msg"); + analysisConfig.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); + + ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, analysisConfig::build); + + assertEquals( + "partition_field_name must be set for detectors that reference mlcategory when per-partition categorization is enabled", + e.getMessage()); + } + + public void testVerify_GivenComplexPerPartitionCategorizationConfig() { + + List detectors = new ArrayList<>(); + Detector.Builder detector = new Detector.Builder("count", null); + detector.setByFieldName("mlcategory"); + detector.setPartitionFieldName("event.dataset"); + detectors.add(detector.build()); + detector = new Detector.Builder("mean", "responsetime"); + detector.setByFieldName("airline"); + detectors.add(detector.build()); + detector = new Detector.Builder("rare", null); + detector.setByFieldName("mlcategory"); + detector.setPartitionFieldName("event.dataset"); + detectors.add(detector.build()); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + analysisConfig.setCategorizationFieldName("msg"); + boolean stopOnWarn = randomBoolean(); + analysisConfig.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, stopOnWarn)); + + assertThat(analysisConfig.build().getPerPartitionCategorizationConfig().isStopOnWarn(), is(stopOnWarn)); + } + private static AnalysisConfig.Builder createValidConfig() { List detectors = new ArrayList<>(); Detector detector = new Detector.Builder("count", null).build(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index e9b731c32d3..a18620037de 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -104,6 +104,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase { if (randomBoolean() && jobSupportsCategorizationFilters(job)) { update.setCategorizationFilters(Arrays.asList(generateRandomStringArray(10, 10, false))); } + if (randomBoolean() && jobSupportsPerPartitionCategorization(job)) { + update.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); + } if (randomBoolean()) { update.setCustomSettings(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(10))); } @@ -139,6 +142,13 @@ public class JobUpdateTests extends AbstractSerializingTestCase { return true; } + private static boolean jobSupportsPerPartitionCategorization(@Nullable Job job) { + if (job == null) { + return true; + } + return job.getAnalysisConfig().getPerPartitionCategorizationConfig().isEnabled(); + } + private static List createRandomDetectorUpdates() { int size = randomInt(10); List detectorUpdates = new ArrayList<>(size); @@ -240,6 +250,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase { updateBuilder.setDailyModelSnapshotRetentionAfterDays(randomLongBetween(0, newModelSnapshotRetentionDays)); updateBuilder.setRenormalizationWindowDays(randomNonNegativeLong()); updateBuilder.setCategorizationFilters(categorizationFilters); + updateBuilder.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); updateBuilder.setCustomSettings(customSettings); updateBuilder.setModelSnapshotId(randomAlphaOfLength(10)); updateBuilder.setJobVersion(Version.V_6_1_0); @@ -249,10 +260,12 @@ public class JobUpdateTests extends AbstractSerializingTestCase { jobBuilder.setGroups(Collections.singletonList("group-1")); Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("mlcategory"); + d1.setPartitionFieldName("host"); Detector.Builder d2 = new Detector.Builder("min", "field"); d2.setOverFieldName("host"); AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(d1.build(), d2.build())); ac.setCategorizationFieldName("cat_field"); + ac.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, randomBoolean())); jobBuilder.setAnalysisConfig(ac); jobBuilder.setDataDescription(new DataDescription.Builder()); jobBuilder.setCreateTime(new Date()); @@ -269,6 +282,8 @@ public class JobUpdateTests extends AbstractSerializingTestCase { assertEquals(update.getModelSnapshotRetentionDays(), updatedJob.getModelSnapshotRetentionDays()); assertEquals(update.getResultsRetentionDays(), updatedJob.getResultsRetentionDays()); assertEquals(update.getCategorizationFilters(), updatedJob.getAnalysisConfig().getCategorizationFilters()); + assertEquals(update.getPerPartitionCategorizationConfig().isEnabled(), + updatedJob.getAnalysisConfig().getPerPartitionCategorizationConfig().isEnabled()); assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings()); assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId()); assertEquals(update.getJobVersion(), updatedJob.getJobVersion()); @@ -305,6 +320,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase { assertTrue(update.isAutodetectProcessUpdate()); update = new JobUpdate.Builder("foo").setGroups(Collections.singletonList("bar")).build(); assertTrue(update.isAutodetectProcessUpdate()); + update = new JobUpdate.Builder("foo") + .setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, true)).build(); + assertTrue(update.isAutodetectProcessUpdate()); } public void testUpdateAnalysisLimitWithValueGreaterThanMax() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfigTests.java new file mode 100644 index 00000000000..eae19bd883b --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/PerPartitionCategorizationConfigTests.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.job.config; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import static org.hamcrest.Matchers.is; + +public class PerPartitionCategorizationConfigTests extends AbstractSerializingTestCase { + + public void testConstructorDefaults() { + assertThat(new PerPartitionCategorizationConfig().isEnabled(), is(false)); + assertThat(new PerPartitionCategorizationConfig().isStopOnWarn(), is(false)); + } + + public void testValidation() { + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> new PerPartitionCategorizationConfig(false, true)); + + assertThat(e.getMessage(), is("stop_on_warn cannot be true in per_partition_categorization when enabled is false")); + } + + @Override + protected PerPartitionCategorizationConfig createTestInstance() { + boolean enabled = randomBoolean(); + return new PerPartitionCategorizationConfig(enabled, randomBoolean() ? null : enabled && randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return PerPartitionCategorizationConfig::new; + } + + @Override + protected PerPartitionCategorizationConfig doParseInstance(XContentParser parser) { + return PerPartitionCategorizationConfig.STRICT_PARSER.apply(parser, null); + } +} diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 4815df1b55a..8be127aac78 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -218,7 +218,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { QueryPage persistedInfluencers = getInfluencers(); assertResultsAreSame(influencers, persistedInfluencers); - QueryPage persistedDefinition = getCategoryDefinition(categoryDefinition.getCategoryId()); + QueryPage persistedDefinition = + getCategoryDefinition(randomBoolean() ? categoryDefinition.getCategoryId() : null, + randomBoolean() ? categoryDefinition.getPartitionFieldValue() : null); assertEquals(1, persistedDefinition.count()); assertEquals(categoryDefinition, persistedDefinition.results().get(0)); @@ -599,11 +601,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { return resultHolder.get(); } - private QueryPage getCategoryDefinition(long categoryId) throws Exception { + private QueryPage getCategoryDefinition(Long categoryId, String partitionFieldValue) throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - jobResultsProvider.categoryDefinitions(JOB_ID, categoryId, false, null, null, r -> { + jobResultsProvider.categoryDefinitions(JOB_ID, categoryId, partitionFieldValue, false, (categoryId == null) ? 0 : null, + (categoryId == null) ? 100 : null, r -> { resultHolder.set(r); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java index f018f853dc8..f9b2f195087 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java @@ -37,8 +37,8 @@ public class TransportGetCategoriesAction extends HandledTransportAction { Integer from = request.getPageParams() != null ? request.getPageParams().getFrom() : null; Integer size = request.getPageParams() != null ? request.getPageParams().getSize() : null; - jobResultsProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), true, from, size, - r -> listener.onResponse(new GetCategoriesAction.Response(r)), listener::onFailure, client); + jobResultsProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), request.getPartitionFieldValue(), + true, from, size, r -> listener.onResponse(new GetCategoriesAction.Response(r)), listener::onFailure, client); }, listener::onFailure )); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java index 94c04b1f738..dadedfe0a31 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateProcessAction.java @@ -30,6 +30,7 @@ public class TransportUpdateProcessAction extends TransportJobTaskAction listener) { UpdateParams updateParams = UpdateParams.builder(request.getJobId()) .modelPlotConfig(request.getModelPlotConfig()) + .perPartitionCategorizationConfig(request.getPerPartitionCategorizationConfig()) .detectorUpdates(request.getDetectorUpdates()) .filter(request.getFilter()) .updateScheduledEvents(request.isUpdateScheduledEvents()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index a1bc5aa357c..5e1425f3370 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -115,8 +115,8 @@ public class UpdateJobProcessNotifier { return; } - Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), - update.isUpdateScheduledEvents()); + Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getPerPartitionCategorizationConfig(), + update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateProcessAction.INSTANCE, request, new ActionListener() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index c8701260843..1d1db92b7f5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -386,7 +386,7 @@ public class JobResultsProvider { @SuppressWarnings("unchecked") public static int countFields(Map mapping) { Object propertiesNode = mapping.get("properties"); - if (propertiesNode != null && propertiesNode instanceof Map) { + if (propertiesNode instanceof Map) { mapping = (Map) propertiesNode; } else { return 0; @@ -817,11 +817,13 @@ public class JobResultsProvider { * Get a page of {@linkplain CategoryDefinition}s for the given jobId. * Uses a supplied client, so may run as the currently authenticated user * @param jobId the job id + * @param categoryId a specific category ID to retrieve, or null to retrieve as many as possible + * @param partitionFieldValue the partition field value to filter on, or null for no filtering * @param augment Should the category definition be augmented with a Grok pattern? * @param from Skip the first N categories. This parameter is for paging * @param size Take only this number of categories */ - public void categoryDefinitions(String jobId, Long categoryId, boolean augment, Integer from, Integer size, + public void categoryDefinitions(String jobId, Long categoryId, String partitionFieldValue, boolean augment, Integer from, Integer size, Consumer> handler, Consumer errorHandler, Client client) { if (categoryId != null && (from != null || size != null)) { @@ -834,16 +836,25 @@ public class JobResultsProvider { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); + QueryBuilder categoryIdQuery; SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); if (categoryId != null) { - sourceBuilder.query(QueryBuilders.termQuery(CategoryDefinition.CATEGORY_ID.getPreferredName(), categoryId)); + categoryIdQuery = QueryBuilders.termQuery(CategoryDefinition.CATEGORY_ID.getPreferredName(), categoryId); } else if (from != null && size != null) { + categoryIdQuery = QueryBuilders.existsQuery(CategoryDefinition.CATEGORY_ID.getPreferredName()); sourceBuilder.from(from).size(size) - .query(QueryBuilders.existsQuery(CategoryDefinition.CATEGORY_ID.getPreferredName())) .sort(new FieldSortBuilder(CategoryDefinition.CATEGORY_ID.getPreferredName()).order(SortOrder.ASC)); } else { throw new IllegalStateException("Both categoryId and pageParams are not specified"); } + if (partitionFieldValue != null) { + QueryBuilder partitionQuery = + QueryBuilders.termQuery(CategoryDefinition.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue); + QueryBuilder combinedQuery = QueryBuilders.boolQuery().must(categoryIdQuery).must(partitionQuery); + sourceBuilder.query(combinedQuery); + } else { + sourceBuilder.query(categoryIdQuery); + } sourceBuilder.trackTotalHits(true); searchRequest.source(sourceBuilder); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -1419,9 +1430,7 @@ public class JobResultsProvider { executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, updateRequest, ActionListener.wrap( - response -> { - handler.accept(updatedCalendar); - }, + response -> handler.accept(updatedCalendar), errorHandler) , client::update); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 3fa4c36cb04..6b98a164aac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -220,6 +220,10 @@ public class AutodetectCommunicator implements Closeable { autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig()); } + if (update.getPerPartitionCategorizationConfig() != null) { + autodetectProcess.writeUpdatePerPartitionCategorizationMessage(update.getPerPartitionCategorizationConfig()); + } + // Filters have to be written before detectors if (update.getFilter() != null) { autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 2ec484ee052..5c205e968af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -50,6 +51,14 @@ public interface AutodetectProcess extends NativeProcess { */ void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException; + /** + * Update the per-partition categorization configuration + * + * @param perPartitionCategorizationConfig New per-partition categorization config + * @throws IOException If the write fails + */ + void writeUpdatePerPartitionCategorizationMessage(PerPartitionCategorizationConfig perPartitionCategorizationConfig) throws IOException; + /** * Write message to update the detector rules * diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index ad983c8f77a..76123b3886a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -84,6 +85,10 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess { public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException { } + @Override + public void writeUpdatePerPartitionCategorizationMessage(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + } + @Override public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 3ab0a843d77..a7b02290496 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -80,6 +81,12 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec newMessageWriter().writeUpdateModelPlotMessage(modelPlotConfig); } + @Override + public void writeUpdatePerPartitionCategorizationMessage(PerPartitionCategorizationConfig perPartitionCategorizationConfig) + throws IOException { + // TODO: write the control message once it's been implemented on the C++ side + } + @Override public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { newMessageWriter().writeUpdateDetectorRulesMessage(detectorIndex, rules); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java index 2d338890f9f..20a2ddd6831 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParams.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import java.util.List; import java.util.Objects; @@ -17,14 +18,18 @@ public final class UpdateParams { private final String jobId; private final ModelPlotConfig modelPlotConfig; + private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; private final List detectorUpdates; private final MlFilter filter; private final boolean updateScheduledEvents; - private UpdateParams(String jobId, @Nullable ModelPlotConfig modelPlotConfig, @Nullable List detectorUpdates, + private UpdateParams(String jobId, @Nullable ModelPlotConfig modelPlotConfig, + @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, + @Nullable List detectorUpdates, @Nullable MlFilter filter, boolean updateScheduledEvents) { this.jobId = Objects.requireNonNull(jobId); this.modelPlotConfig = modelPlotConfig; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.detectorUpdates = detectorUpdates; this.filter = filter; this.updateScheduledEvents = updateScheduledEvents; @@ -39,6 +44,11 @@ public final class UpdateParams { return modelPlotConfig; } + @Nullable + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + @Nullable public List getDetectorUpdates() { return detectorUpdates; @@ -55,7 +65,7 @@ public final class UpdateParams { * update to external resources a job uses (e.g. calendars, filters). */ public boolean isJobUpdate() { - return modelPlotConfig != null || detectorUpdates != null; + return modelPlotConfig != null || detectorUpdates != null || perPartitionCategorizationConfig != null; } public boolean isUpdateScheduledEvents() { @@ -65,6 +75,7 @@ public final class UpdateParams { public static UpdateParams fromJobUpdate(JobUpdate jobUpdate) { return new Builder(jobUpdate.getJobId()) .modelPlotConfig(jobUpdate.getModelPlotConfig()) + .perPartitionCategorizationConfig(jobUpdate.getPerPartitionCategorizationConfig()) .detectorUpdates(jobUpdate.getDetectorUpdates()) .updateScheduledEvents(jobUpdate.getGroups() != null) .build(); @@ -86,6 +97,7 @@ public final class UpdateParams { private String jobId; private ModelPlotConfig modelPlotConfig; + private PerPartitionCategorizationConfig perPartitionCategorizationConfig; private List detectorUpdates; private MlFilter filter; private boolean updateScheduledEvents; @@ -99,6 +111,11 @@ public final class UpdateParams { return this; } + public Builder perPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; + return this; + } + public Builder detectorUpdates(List detectorUpdates) { this.detectorUpdates = detectorUpdates; return this; @@ -115,7 +132,8 @@ public final class UpdateParams { } public UpdateParams build() { - return new UpdateParams(jobId, modelPlotConfig, detectorUpdates, filter, updateScheduledEvents); + return new UpdateParams(jobId, modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filter, + updateScheduledEvents); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java index 4686d4ed372..a0f9662e3a5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateProcessMessage.java @@ -10,19 +10,24 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import java.util.List; public final class UpdateProcessMessage { @Nullable private final ModelPlotConfig modelPlotConfig; + @Nullable private final PerPartitionCategorizationConfig perPartitionCategorizationConfig; @Nullable private final List detectorUpdates; @Nullable private final MlFilter filter; @Nullable private final List scheduledEvents; - private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, @Nullable List detectorUpdates, + private UpdateProcessMessage(@Nullable ModelPlotConfig modelPlotConfig, + @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig, + @Nullable List detectorUpdates, @Nullable MlFilter filter, List scheduledEvents) { this.modelPlotConfig = modelPlotConfig; + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; this.detectorUpdates = detectorUpdates; this.filter = filter; this.scheduledEvents = scheduledEvents; @@ -33,6 +38,11 @@ public final class UpdateProcessMessage { return modelPlotConfig; } + @Nullable + public PerPartitionCategorizationConfig getPerPartitionCategorizationConfig() { + return perPartitionCategorizationConfig; + } + @Nullable public List getDetectorUpdates() { return detectorUpdates; @@ -51,6 +61,7 @@ public final class UpdateProcessMessage { public static class Builder { @Nullable private ModelPlotConfig modelPlotConfig; + @Nullable private PerPartitionCategorizationConfig perPartitionCategorizationConfig; @Nullable private List detectorUpdates; @Nullable private MlFilter filter; @Nullable private List scheduledEvents; @@ -60,6 +71,11 @@ public final class UpdateProcessMessage { return this; } + public Builder setPerPartitionCategorizationConfig(PerPartitionCategorizationConfig perPartitionCategorizationConfig) { + this.perPartitionCategorizationConfig = perPartitionCategorizationConfig; + return this; + } + public Builder setDetectorUpdates(List detectorUpdates) { this.detectorUpdates = detectorUpdates; return this; @@ -76,7 +92,7 @@ public final class UpdateProcessMessage { } public UpdateProcessMessage build() { - return new UpdateProcessMessage(modelPlotConfig, detectorUpdates, filter, scheduledEvents); + return new UpdateProcessMessage(modelPlotConfig, perPartitionCategorizationConfig, detectorUpdates, filter, scheduledEvents); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java index f26027b3a07..ea8b3585f23 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriter.java @@ -35,6 +35,7 @@ public class FieldConfigWriter { private static final String INFLUENCER_PREFIX = "influencer."; private static final String CATEGORIZATION_FIELD_OPTION = " categorizationfield="; private static final String CATEGORIZATION_FILTER_PREFIX = "categorizationfilter."; + private static final String PER_PARTITION_CATEGORIZATION_OPTION = " perpartitioncategorization="; // Note: for the Engine API summarycountfield is currently passed as a // command line option to autodetect rather than in the field config file @@ -94,14 +95,16 @@ public class FieldConfigWriter { } private void writeDetectorClause(int detectorId, Detector detector, StringBuilder contents) { - contents.append(DETECTOR_PREFIX).append(detectorId) - .append(DETECTOR_CLAUSE_SUFFIX).append(EQUALS); + contents.append(DETECTOR_PREFIX).append(detectorId).append(DETECTOR_CLAUSE_SUFFIX).append(EQUALS); DefaultDetectorDescription.appendOn(detector, contents); if (Strings.isNullOrEmpty(config.getCategorizationFieldName()) == false) { - contents.append(CATEGORIZATION_FIELD_OPTION) - .append(quoteField(config.getCategorizationFieldName())); + contents.append(CATEGORIZATION_FIELD_OPTION).append(quoteField(config.getCategorizationFieldName())); + if (Strings.isNullOrEmpty(detector.getPartitionFieldName()) == false && + config.getPerPartitionCategorizationConfig().isEnabled()) { + contents.append(PER_PARTITION_CATEGORIZATION_OPTION).append("true"); + } } contents.append(NEW_LINE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java index 2cb15f8a49e..7cf0b1d4f03 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestGetCategoriesAction.java @@ -85,6 +85,7 @@ public class RestGetCategoriesAction extends BaseRestHandler { restRequest.paramAsInt(Request.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE) )); } + request.setPartitionFieldValue(restRequest.param(Request.PARTITION_FIELD_VALUE.getPreferredName())); } return channel -> client.execute(GetCategoriesAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index d869964c6dd..6d62408f79b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -92,10 +93,9 @@ public class JobResultsProviderTests extends ESTestCase { BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(1.0); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client); - QueryPage buckets = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.buckets(jobId, bq, holder::set, e -> {throw new RuntimeException(e);}, client); + QueryPage buckets = holder.get(); assertEquals(1L, buckets.count()); QueryBuilder query = queryBuilderHolder[0]; String queryString = query.toString(); @@ -126,10 +126,9 @@ public class JobResultsProviderTests extends ESTestCase { BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(5.1) .includeInterim(true); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client); - QueryPage buckets = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.buckets(jobId, bq, holder::set, e -> {throw new RuntimeException(e);}, client); + QueryPage buckets = holder.get(); assertEquals(1L, buckets.count()); QueryBuilder query = queryBuilderHolder[0]; String queryString = query.toString(); @@ -162,10 +161,9 @@ public class JobResultsProviderTests extends ESTestCase { bq.anomalyScoreThreshold(5.1); bq.includeInterim(true); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client); - QueryPage buckets = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.buckets(jobId, bq, holder::set, e -> {throw new RuntimeException(e);}, client); + QueryPage buckets = holder.get(); assertEquals(1L, buckets.count()); QueryBuilder query = queryBuilderHolder[0]; String queryString = query.toString(); @@ -175,7 +173,7 @@ public class JobResultsProviderTests extends ESTestCase { public void testBucket_NoBucketNoExpand() throws IOException { String jobId = "TestJobIdentification"; - Long timestamp = 98765432123456789L; + long timestamp = 98765432123456789L; List> source = new ArrayList<>(); SearchResponse response = createSearchResponse(source); @@ -208,11 +206,10 @@ public class JobResultsProviderTests extends ESTestCase { BucketsQueryBuilder bq = new BucketsQueryBuilder(); bq.timestamp(Long.toString(now.getTime())); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] bucketHolder = new QueryPage[1]; - provider.buckets(jobId, bq, q -> bucketHolder[0] = q, e -> {}, client); - assertThat(bucketHolder[0].count(), equalTo(1L)); - Bucket b = bucketHolder[0].results().get(0); + SetOnce> bucketHolder = new SetOnce<>(); + provider.buckets(jobId, bq, bucketHolder::set, e -> {}, client); + assertThat(bucketHolder.get().count(), equalTo(1L)); + Bucket b = bucketHolder.get().results().get(0); assertEquals(now, b.getTimestamp()); } @@ -249,10 +246,9 @@ public class JobResultsProviderTests extends ESTestCase { .epochEnd(String.valueOf(now.getTime())).includeInterim(true).sortField(sortfield) .recordScore(2.2); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.records(jobId, rqb, page -> holder[0] = page, RuntimeException::new, client); - QueryPage recordPage = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.records(jobId, rqb, holder::set, e -> { throw new RuntimeException(e); }, client); + QueryPage recordPage = holder.get(); assertEquals(2L, recordPage.count()); List records = recordPage.results(); assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001); @@ -302,10 +298,9 @@ public class JobResultsProviderTests extends ESTestCase { rqb.sortField(sortfield); rqb.recordScore(2.2); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.records(jobId, rqb, page -> holder[0] = page, RuntimeException::new, client); - QueryPage recordPage = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.records(jobId, rqb, holder::set, e -> { throw new RuntimeException(e); }, client); + QueryPage recordPage = holder.get(); assertEquals(2L, recordPage.count()); List records = recordPage.results(); assertEquals(22.4, records.get(0).getTypical().get(0), 0.000001); @@ -347,11 +342,10 @@ public class JobResultsProviderTests extends ESTestCase { Client client = getMockedClient(qb -> {}, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, page -> holder[0] = page, RuntimeException::new, - client); - QueryPage recordPage = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, holder::set, + e -> { throw new RuntimeException(e); }, client); + QueryPage recordPage = holder.get(); assertEquals(2L, recordPage.count()); List records = recordPage.results(); @@ -385,7 +379,7 @@ public class JobResultsProviderTests extends ESTestCase { JobResultsProvider provider = createProvider(client); Integer[] holder = new Integer[1]; - provider.expandBucket(jobId, false, bucket, records -> holder[0] = records, RuntimeException::new, client); + provider.expandBucket(jobId, false, bucket, records -> holder[0] = records, e -> { throw new RuntimeException(e); }, client); int records = holder[0]; assertEquals(400L, records); } @@ -408,11 +402,10 @@ public class JobResultsProviderTests extends ESTestCase { Client client = getMockedClient(q -> {}, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.categoryDefinitions(jobId, null, false, from, size, r -> holder[0] = r, - e -> {throw new RuntimeException(e);}, client); - QueryPage categoryDefinitions = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.categoryDefinitions(jobId, null, null, false, from, size, holder::set, + e -> { throw new RuntimeException(e); }, client); + QueryPage categoryDefinitions = holder.get(); assertEquals(1L, categoryDefinitions.count()); assertEquals(terms, categoryDefinitions.results().get(0).getTerms()); } @@ -430,11 +423,10 @@ public class JobResultsProviderTests extends ESTestCase { SearchResponse response = createSearchResponse(Collections.singletonList(source)); Client client = getMockedClient(q -> {}, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.categoryDefinitions(jobId, categoryId, false, null, null, - r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client); - QueryPage categoryDefinitions = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.categoryDefinitions(jobId, categoryId, null, false, null, null, + holder::set, e -> { throw new RuntimeException(e); }, client); + QueryPage categoryDefinitions = holder.get(); assertEquals(1L, categoryDefinitions.count()); assertEquals(terms, categoryDefinitions.results().get(0).getTerms()); } @@ -472,11 +464,10 @@ public class JobResultsProviderTests extends ESTestCase { Client client = getMockedClient(q -> qbHolder[0] = q, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; + SetOnce> holder = new SetOnce<>(); InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).includeInterim(false).build(); - provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new, client); - QueryPage page = holder[0]; + provider.influencers(jobId, query, holder::set, e -> { throw new RuntimeException(e); }, client); + QueryPage page = holder.get(); assertEquals(2L, page.count()); String queryString = qbHolder[0].toString(); @@ -532,12 +523,11 @@ public class JobResultsProviderTests extends ESTestCase { Client client = getMockedClient(q -> qbHolder[0] = q, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; + SetOnce> holder = new SetOnce<>(); InfluencersQuery query = new InfluencersQueryBuilder().from(from).size(size).start("0").end("0").sortField("sort") .sortDescending(true).influencerScoreThreshold(0.0).includeInterim(true).build(); - provider.influencers(jobId, query, page -> holder[0] = page, RuntimeException::new, client); - QueryPage page = holder[0]; + provider.influencers(jobId, query, holder::set, e -> { throw new RuntimeException(e); }, client); + QueryPage page = holder.get(); assertEquals(2L, page.count()); String queryString = qbHolder[0].toString(); @@ -587,10 +577,9 @@ public class JobResultsProviderTests extends ESTestCase { Client client = getMockedClient(qb -> {}, response); JobResultsProvider provider = createProvider(client); - @SuppressWarnings({"unchecked", "rawtypes"}) - QueryPage[] holder = new QueryPage[1]; - provider.modelSnapshots(jobId, from, size, r -> holder[0] = r, RuntimeException::new); - QueryPage page = holder[0]; + SetOnce> holder = new SetOnce<>(); + provider.modelSnapshots(jobId, from, size, holder::set, e -> { throw new RuntimeException(e); }); + QueryPage page = holder.get(); assertEquals(2L, page.count()); List snapshots = page.results(); @@ -608,7 +597,7 @@ public class JobResultsProviderTests extends ESTestCase { assertEquals(6, snapshots.get(1).getSnapshotDocCount()); } - public void testViolatedFieldCountLimit() throws Exception { + public void testViolatedFieldCountLimit() throws IOException { Map mapping = new HashMap<>(); int i = 0; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java index 2683c1131f5..a406136b8f6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/UpdateParamsTests.java @@ -10,9 +10,9 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.config.Operator; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -21,14 +21,14 @@ public class UpdateParamsTests extends ESTestCase { public void testFromJobUpdate() { String jobId = "foo"; - DetectionRule rule = new DetectionRule.Builder(Arrays.asList( - new RuleCondition(RuleCondition.AppliesTo.ACTUAL, - Operator.GT, 1.0))).build(); - List rules = Arrays.asList(rule); + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( + new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, 1.0))).build(); + List rules = Collections.singletonList(rule); List detectorUpdates = Collections.singletonList( new JobUpdate.DetectorUpdate(2, null, rules)); JobUpdate.Builder updateBuilder = new JobUpdate.Builder(jobId) .setModelPlotConfig(new ModelPlotConfig()) + .setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig()) .setDetectorUpdates(detectorUpdates); UpdateParams params = UpdateParams.fromJobUpdate(updateBuilder.build()); @@ -36,10 +36,12 @@ public class UpdateParamsTests extends ESTestCase { assertFalse(params.isUpdateScheduledEvents()); assertEquals(params.getDetectorUpdates(), updateBuilder.build().getDetectorUpdates()); assertEquals(params.getModelPlotConfig(), updateBuilder.build().getModelPlotConfig()); + assertEquals(params.getPerPartitionCategorizationConfig(), updateBuilder.build().getPerPartitionCategorizationConfig()); - params = UpdateParams.fromJobUpdate(updateBuilder.setGroups(Arrays.asList("bar")).build()); + params = UpdateParams.fromJobUpdate(updateBuilder.setGroups(Collections.singletonList("bar")).build()); assertTrue(params.isUpdateScheduledEvents()); + assertTrue(params.isJobUpdate()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java index 0934d62f5c7..098295e29c8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/FieldConfigWriterTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.Operator; +import org.elasticsearch.xpack.core.ml.job.config.PerPartitionCategorizationConfig; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.ml.MachineLearning; import org.ini4j.Config; @@ -142,6 +143,24 @@ public class FieldConfigWriterTests extends ESTestCase { verifyNoMoreInteractions(writer); } + public void testWrite_GivenConfigHasPerPartitionCategorization() throws IOException { + Detector.Builder d = new Detector.Builder("metric", "Integer_Value"); + d.setByFieldName("mlcategory"); + d.setPartitionFieldName("event.dataset"); + + AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + builder.setCategorizationFieldName("message"); + builder.setPerPartitionCategorizationConfig(new PerPartitionCategorizationConfig(true, false)); + analysisConfig = builder.build(); + writer = mock(OutputStreamWriter.class); + + createFieldConfigWriter().write(); + + verify(writer).write("detector.0.clause = metric(Integer_Value) by mlcategory partitionfield=\"event.dataset\" " + + "categorizationfield=message perpartitioncategorization=true\n"); + verifyNoMoreInteractions(writer); + } + public void testWrite_GivenConfigHasInfluencers() throws IOException { Detector.Builder d = new Detector.Builder("metric", "Integer_Value"); d.setByFieldName("ts_hash"); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinitionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinitionTests.java index de4a60e9851..c9c00e5b32d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinitionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/CategoryDefinitionTests.java @@ -24,6 +24,10 @@ public class CategoryDefinitionTests extends AbstractBWCSerializationTestCaseonly used for parsing C++ output. + */ public void testStrictParser() throws IOException { String json = "{\"job_id\":\"job_1\", \"foo\":\"bar\"}"; try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { @@ -157,8 +170,9 @@ public class CategoryDefinitionTests extends AbstractBWCSerializationTestCase